fix(ingest): handle missing schema in transformer (#6445)

This commit is contained in:
Harshal Sheth 2022-11-16 17:37:43 -05:00 committed by GitHub
parent cf6fb91f8d
commit a3aec72c3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 27 deletions

View File

@ -83,10 +83,8 @@ class AddDatasetSchemaTags(DatasetSchemaMetadataTransformer):
) -> Optional[builder.Aspect]: ) -> Optional[builder.Aspect]:
schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect) schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect)
assert ( assert schema_metadata_aspect is None or isinstance(
schema_metadata_aspect is None schema_metadata_aspect, SchemaMetadataClass
or isinstance(schema_metadata_aspect, SchemaMetadataClass)
or schema_metadata_aspect.field is None
) )
server_field_map: dict = {} server_field_map: dict = {}
@ -96,20 +94,24 @@ class AddDatasetSchemaTags(DatasetSchemaMetadataTransformer):
SchemaMetadataClass SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn) ] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn)
if server_schema_metadata_aspect is not None: if server_schema_metadata_aspect is not None:
if not schema_metadata_aspect:
schema_metadata_aspect = server_schema_metadata_aspect
input_field_path = [ input_field_path = [
field.fieldPath field.fieldPath
for field in schema_metadata_aspect.fields for field in schema_metadata_aspect.fields
if field is not None if field is not None
] ]
server_field_to_add: List[SchemaFieldClass] = []
# cache the server field to use in patching the schema-field later # cache the server field to use in patching the schema-field later
for field in server_schema_metadata_aspect.fields: for field in server_schema_metadata_aspect.fields:
server_field_map[field.fieldPath] = field server_field_map[field.fieldPath] = field
if field.fieldPath not in input_field_path: if field.fieldPath not in input_field_path:
# This field is present on server but not in input aspect # This field is present on server but not in input aspect
server_field_to_add.append(field) schema_metadata_aspect.fields.append(field)
# Add field present on server
schema_metadata_aspect.fields.extend(server_field_to_add) if not schema_metadata_aspect:
# We can't add tags to a schema that doesn't exist.
return None
schema_metadata_aspect.fields = [ schema_metadata_aspect.fields = [
self.extend_field(field, server_field=server_field_map.get(field.fieldPath)) self.extend_field(field, server_field=server_field_map.get(field.fieldPath))

View File

@ -90,11 +90,8 @@ class AddDatasetSchemaTerms(DatasetSchemaMetadataTransformer):
self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect] self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect]
) -> Optional[builder.Aspect]: ) -> Optional[builder.Aspect]:
schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect) schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect)
assert schema_metadata_aspect is None or isinstance(
assert ( schema_metadata_aspect, SchemaMetadataClass
schema_metadata_aspect is None
or isinstance(schema_metadata_aspect, SchemaMetadataClass)
or schema_metadata_aspect.field is None
) )
server_field_map: Dict[ server_field_map: Dict[
@ -106,20 +103,24 @@ class AddDatasetSchemaTerms(DatasetSchemaMetadataTransformer):
SchemaMetadataClass SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn) ] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn)
if server_schema_metadata_aspect is not None: if server_schema_metadata_aspect is not None:
if not schema_metadata_aspect:
schema_metadata_aspect = server_schema_metadata_aspect
else:
input_field_path = [ input_field_path = [
field.fieldPath field.fieldPath
for field in schema_metadata_aspect.fields for field in schema_metadata_aspect.fields
if field is not None if field is not None
] ]
server_field_to_add: List[SchemaFieldClass] = []
# cache the server field to use in patching the schema-field later # cache the server field to use in patching the schema-field later
for field in server_schema_metadata_aspect.fields: for field in server_schema_metadata_aspect.fields:
server_field_map[field.fieldPath] = field server_field_map[field.fieldPath] = field
if field.fieldPath not in input_field_path: if field.fieldPath not in input_field_path:
# This field is present on server but not in input aspect # This field is present on server but not in input aspect, so we add it.
server_field_to_add.append(field) schema_metadata_aspect.fields.append(field)
# Add field present on server
schema_metadata_aspect.fields.extend(server_field_to_add) if not schema_metadata_aspect:
# We can't add terms to a schema that doesn't exist.
return None
schema_metadata_aspect.fields = [ schema_metadata_aspect.fields = [
self.extend_field(field, server_field=server_field_map.get(field.fieldPath)) self.extend_field(field, server_field=server_field_map.get(field.fieldPath))