diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_tags.py index e700c2c1eb..4287d0f1a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_tags.py @@ -83,10 +83,8 @@ class AddDatasetSchemaTags(DatasetSchemaMetadataTransformer): ) -> Optional[builder.Aspect]: schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect) - assert ( - schema_metadata_aspect is None - or isinstance(schema_metadata_aspect, SchemaMetadataClass) - or schema_metadata_aspect.field is None + assert schema_metadata_aspect is None or isinstance( + schema_metadata_aspect, SchemaMetadataClass ) server_field_map: dict = {} @@ -96,20 +94,24 @@ class AddDatasetSchemaTags(DatasetSchemaMetadataTransformer): SchemaMetadataClass ] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn) if server_schema_metadata_aspect is not None: + if not schema_metadata_aspect: + schema_metadata_aspect = server_schema_metadata_aspect + input_field_path = [ field.fieldPath for field in schema_metadata_aspect.fields if field is not None ] - server_field_to_add: List[SchemaFieldClass] = [] # cache the server field to use in patching the schema-field later for field in server_schema_metadata_aspect.fields: server_field_map[field.fieldPath] = field if field.fieldPath not in input_field_path: # This field is present on server but not in input aspect - server_field_to_add.append(field) - # Add field present on server - schema_metadata_aspect.fields.extend(server_field_to_add) + schema_metadata_aspect.fields.append(field) + + if not schema_metadata_aspect: + # We can't add tags to a schema that doesn't exist. + return None schema_metadata_aspect.fields = [ self.extend_field(field, server_field=server_field_map.get(field.fieldPath)) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py index 60cf6656df..a7502eb1c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_schema_terms.py @@ -90,11 +90,8 @@ class AddDatasetSchemaTerms(DatasetSchemaMetadataTransformer): self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect] ) -> Optional[builder.Aspect]: schema_metadata_aspect: SchemaMetadataClass = cast(SchemaMetadataClass, aspect) - - assert ( - schema_metadata_aspect is None - or isinstance(schema_metadata_aspect, SchemaMetadataClass) - or schema_metadata_aspect.field is None + assert schema_metadata_aspect is None or isinstance( + schema_metadata_aspect, SchemaMetadataClass ) server_field_map: Dict[ @@ -106,20 +103,24 @@ class AddDatasetSchemaTerms(DatasetSchemaMetadataTransformer): SchemaMetadataClass ] = self.ctx.graph.get_schema_metadata(entity_urn=entity_urn) if server_schema_metadata_aspect is not None: - input_field_path = [ - field.fieldPath - for field in schema_metadata_aspect.fields - if field is not None - ] - server_field_to_add: List[SchemaFieldClass] = [] - # cache the server field to use in patching the schema-field later - for field in server_schema_metadata_aspect.fields: - server_field_map[field.fieldPath] = field - if field.fieldPath not in input_field_path: - # This field is present on server but not in input aspect - server_field_to_add.append(field) - # Add field present on server - schema_metadata_aspect.fields.extend(server_field_to_add) + if not schema_metadata_aspect: + schema_metadata_aspect = server_schema_metadata_aspect + else: + input_field_path = [ + field.fieldPath + for field in schema_metadata_aspect.fields + if field is not None + ] + # cache the server field to use in patching the schema-field later + for field in server_schema_metadata_aspect.fields: + server_field_map[field.fieldPath] = field + if field.fieldPath not in input_field_path: + # This field is present on server but not in input aspect, so we add it. + schema_metadata_aspect.fields.append(field) + + if not schema_metadata_aspect: + # We can't add terms to a schema that doesn't exist. + return None schema_metadata_aspect.fields = [ self.extend_field(field, server_field=server_field_map.get(field.fieldPath))