fix(ingest/browse): Re-emit browse path v2 aspects to avoid race condition (#9227)

This commit is contained in:
Andrew Sikowitz 2023-11-29 14:56:30 -05:00 committed by GitHub
parent fe444aff26
commit 5e52e31fc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 1 deletions

View File

@ -33,6 +33,7 @@ from datahub.ingestion.api.source_helpers import (
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
re_emit_browse_path_v2,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -278,13 +279,14 @@ class Source(Closeable, metaclass=ABCMeta):
if isinstance(config, PlatformInstanceConfigMixin) and config.platform_instance:
platform_instance = config.platform_instance
return partial(
browse_path_processor = partial(
auto_browse_path_v2,
platform=platform,
platform_instance=platform_instance,
drop_dirs=[s for s in browse_path_drop_dirs if s is not None],
dry_run=dry_run,
)
return lambda stream: re_emit_browse_path_v2(browse_path_processor(stream))
class TestableSource(Source):

View File

@ -198,6 +198,21 @@ def auto_lowercase_urns(
yield wu
def re_emit_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Re-emit browse paths v2 aspects, to avoid race condition where server overwrites with default."""
browse_path_v2_workunits = []
for wu in stream:
yield wu
if wu.is_primary_source and wu.get_aspect_of_type(BrowsePathsV2Class):
browse_path_v2_workunits.append(wu)
for wu in browse_path_v2_workunits:
yield wu
def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
*,