feat(ingest): auto-fix duplicate schema fieldPaths (#10526)

This commit is contained in:
Harshal Sheth 2024-05-21 15:03:08 -07:00 committed by GitHub
parent 2b6c78b776
commit f2b5875632
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 3 deletions

View File

@ -30,6 +30,7 @@ from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUn
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_fix_duplicate_schema_field_paths,
auto_lowercase_urns,
auto_materialize_referenced_tags_terms,
auto_status_aspect,
@ -243,6 +244,9 @@ class Source(Closeable, metaclass=ABCMeta):
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags_terms,
partial(
auto_fix_duplicate_schema_field_paths, platform=self._infer_platform()
),
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
]
@ -286,14 +290,18 @@ class Source(Closeable, metaclass=ABCMeta):
def close(self) -> None:
pass
def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
def _infer_platform(self) -> Optional[str]:
config = self.get_config()
platform = (
return (
getattr(config, "platform_name", None)
or getattr(self, "platform", None)
or getattr(config, "platform", None)
)
def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = self._infer_platform()
env = getattr(config, "env", None)
browse_path_drop_dirs = [
platform,

View File

@ -30,6 +30,8 @@ from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
MetadataChangeProposalClass,
OwnershipClass as Ownership,
SchemaFieldClass,
SchemaMetadataClass,
StatusClass,
TimeWindowSizeClass,
)
@ -378,6 +380,54 @@ def auto_browse_path_v2(
telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties)
def auto_fix_duplicate_schema_field_paths(
stream: Iterable[MetadataWorkUnit],
*,
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
"""Count schema metadata aspects with duplicate field paths and emit telemetry."""
total_schema_aspects = 0
schemas_with_duplicates = 0
duplicated_field_paths = 0
for wu in stream:
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
total_schema_aspects += 1
seen_fields = set()
dropped_fields = []
updated_fields: List[SchemaFieldClass] = []
for field in schema_metadata.fields:
if field.fieldPath in seen_fields:
dropped_fields.append(field.fieldPath)
else:
seen_fields.add(field.fieldPath)
updated_fields.append(field)
if dropped_fields:
logger.info(
f"Fixing duplicate field paths in schema aspect for {wu.get_urn()} by dropping fields: {dropped_fields}"
)
schema_metadata.fields = updated_fields
schemas_with_duplicates += 1
duplicated_field_paths += len(dropped_fields)
yield wu
if schemas_with_duplicates:
properties = {
"platform": platform,
"total_schema_aspects": total_schema_aspects,
"schemas_with_duplicates": schemas_with_duplicates,
"duplicated_field_paths": duplicated_field_paths,
}
telemetry.telemetry_instance.ping(
"ingestion_duplicate_schema_field_paths", properties
)
def auto_empty_dataset_usage_statistics(
stream: Iterable[MetadataWorkUnit],
*,