From f2b58756324bfafe72d0dcff8bc22b07f622c8ca Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 21 May 2024 15:03:08 -0700 Subject: [PATCH] feat(ingest): auto-fix duplicate schema fieldPaths (#10526) --- .../src/datahub/ingestion/api/source.py | 14 ++++-- .../datahub/ingestion/api/source_helpers.py | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index f6755cb09d..173f7ec59b 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -30,6 +30,7 @@ from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUn from datahub.ingestion.api.report import Report from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, + auto_fix_duplicate_schema_field_paths, auto_lowercase_urns, auto_materialize_referenced_tags_terms, auto_status_aspect, @@ -243,6 +244,9 @@ class Source(Closeable, metaclass=ABCMeta): auto_lowercase_dataset_urns, auto_status_aspect, auto_materialize_referenced_tags_terms, + partial( + auto_fix_duplicate_schema_field_paths, platform=self._infer_platform() + ), browse_path_processor, partial(auto_workunit_reporter, self.get_report()), ] @@ -286,14 +290,18 @@ class Source(Closeable, metaclass=ABCMeta): def close(self) -> None: pass - def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: + def _infer_platform(self) -> Optional[str]: config = self.get_config() - - platform = ( + return ( getattr(config, "platform_name", None) or getattr(self, "platform", None) or getattr(config, "platform", None) ) + + def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: + config = self.get_config() + + platform = self._infer_platform() env = getattr(config, "env", None) browse_path_drop_dirs = [ platform, diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 495db9f899..91402fa3c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -30,6 +30,8 @@ from datahub.metadata.schema_classes import ( MetadataChangeEventClass, MetadataChangeProposalClass, OwnershipClass as Ownership, + SchemaFieldClass, + SchemaMetadataClass, StatusClass, TimeWindowSizeClass, ) @@ -378,6 +380,54 @@ def auto_browse_path_v2( telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties) +def auto_fix_duplicate_schema_field_paths( + stream: Iterable[MetadataWorkUnit], + *, + platform: Optional[str] = None, +) -> Iterable[MetadataWorkUnit]: + """Count schema metadata aspects with duplicate field paths and emit telemetry.""" + + total_schema_aspects = 0 + schemas_with_duplicates = 0 + duplicated_field_paths = 0 + + for wu in stream: + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + if schema_metadata: + total_schema_aspects += 1 + + seen_fields = set() + dropped_fields = [] + updated_fields: List[SchemaFieldClass] = [] + for field in schema_metadata.fields: + if field.fieldPath in seen_fields: + dropped_fields.append(field.fieldPath) + else: + seen_fields.add(field.fieldPath) + updated_fields.append(field) + + if dropped_fields: + logger.info( + f"Fixing duplicate field paths in schema aspect for {wu.get_urn()} by dropping fields: {dropped_fields}" + ) + schema_metadata.fields = updated_fields + schemas_with_duplicates += 1 + duplicated_field_paths += len(dropped_fields) + + yield wu + + if schemas_with_duplicates: + properties = { + "platform": platform, + "total_schema_aspects": total_schema_aspects, + "schemas_with_duplicates": schemas_with_duplicates, + "duplicated_field_paths": duplicated_field_paths, + } + telemetry.telemetry_instance.ping( + "ingestion_duplicate_schema_field_paths", properties + ) + + def auto_empty_dataset_usage_statistics( stream: Iterable[MetadataWorkUnit], *,