fix(ingest): drop empty fields (#11613)

This commit is contained in:
Aseem Bansal 2024-10-15 14:53:45 +05:30 committed by GitHub
parent be1b8806a7
commit 1eec2c4238
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 46 additions and 0 deletions

View File

@ -37,6 +37,7 @@ from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_fix_duplicate_schema_field_paths,
auto_fix_empty_field_paths,
auto_lowercase_urns,
auto_materialize_referenced_tags_terms,
auto_status_aspect,
@ -444,6 +445,7 @@ class Source(Closeable, metaclass=ABCMeta):
partial(
auto_fix_duplicate_schema_field_paths, platform=self._infer_platform()
),
partial(auto_fix_empty_field_paths, platform=self._infer_platform()),
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
auto_patch_last_modified,

View File

@ -394,6 +394,50 @@ def auto_fix_duplicate_schema_field_paths(
)
def auto_fix_empty_field_paths(
stream: Iterable[MetadataWorkUnit],
*,
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
"""Count schema metadata aspects with empty field paths and emit telemetry."""
total_schema_aspects = 0
schemas_with_empty_fields = 0
empty_field_paths = 0
for wu in stream:
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
total_schema_aspects += 1
updated_fields: List[SchemaFieldClass] = []
for field in schema_metadata.fields:
if field.fieldPath:
updated_fields.append(field)
else:
empty_field_paths += 1
if empty_field_paths > 0:
logger.info(
f"Fixing empty field paths in schema aspect for {wu.get_urn()} by dropping empty fields"
)
schema_metadata.fields = updated_fields
schemas_with_empty_fields += 1
yield wu
if schemas_with_empty_fields > 0:
properties = {
"platform": platform,
"total_schema_aspects": total_schema_aspects,
"schemas_with_empty_fields": schemas_with_empty_fields,
"empty_field_paths": empty_field_paths,
}
telemetry.telemetry_instance.ping(
"ingestion_empty_schema_field_paths", properties
)
def auto_empty_dataset_usage_statistics(
stream: Iterable[MetadataWorkUnit],
*,