diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 8940642f70..a272b6e3cf 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -33,6 +33,7 @@ from datahub.ingestion.api.source_helpers import ( auto_materialize_referenced_tags, auto_status_aspect, auto_workunit_reporter, + re_emit_browse_path_v2, ) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent @@ -278,13 +279,14 @@ class Source(Closeable, metaclass=ABCMeta): if isinstance(config, PlatformInstanceConfigMixin) and config.platform_instance: platform_instance = config.platform_instance - return partial( + browse_path_processor = partial( auto_browse_path_v2, platform=platform, platform_instance=platform_instance, drop_dirs=[s for s in browse_path_drop_dirs if s is not None], dry_run=dry_run, ) + return lambda stream: re_emit_browse_path_v2(browse_path_processor(stream)) class TestableSource(Source): diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index fae2602261..66365ef0cd 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -198,6 +198,21 @@ def auto_lowercase_urns( yield wu +def re_emit_browse_path_v2( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """Re-emit browse paths v2 aspects, to avoid race condition where server overwrites with default.""" + browse_path_v2_workunits = [] + + for wu in stream: + yield wu + if wu.is_primary_source and wu.get_aspect_of_type(BrowsePathsV2Class): + browse_path_v2_workunits.append(wu) + + for wu in browse_path_v2_workunits: + yield wu + + def auto_browse_path_v2( stream: Iterable[MetadataWorkUnit], *,