fix(ingest): pass platform correctly to browse path v2 helper (#8244)

This commit is contained in:
Andrew Sikowitz 2023-06-15 23:10:15 -04:00 committed by GitHub
parent 35a4434490
commit 2751a09284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 27 deletions

View File

@ -23,7 +23,7 @@ from pydantic import BaseModel
from datahub.configuration.common import ConfigModel from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformInstanceConfigMixin from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.emitter.mcp_builder import PlatformKey, mcps_from_mce from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report from datahub.ingestion.api.report import Report
@ -249,19 +249,14 @@ class Source(Closeable, metaclass=ABCMeta):
env and env.lower(), env and env.lower(),
] ]
platform_key: Optional[PlatformKey] = None platform_instance: Optional[str] = None
if ( if isinstance(config, PlatformInstanceConfigMixin) and config.platform_instance:
platform platform_instance = platform_instance
and isinstance(config, PlatformInstanceConfigMixin)
and config.platform_instance
):
platform_key = PlatformKey(
platform=platform, instance=config.platform_instance
)
return partial( return partial(
auto_browse_path_v2, auto_browse_path_v2,
platform_key=platform_key, platform=platform,
platform_instance=platform_instance,
drop_dirs=[s for s in browse_path_drop_dirs if s is not None], drop_dirs=[s for s in browse_path_drop_dirs if s is not None],
dry_run=dry_run, dry_run=dry_run,
) )

View File

@ -15,7 +15,6 @@ from typing import (
from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import PlatformKey
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
BrowsePathEntryClass, BrowsePathEntryClass,
@ -173,7 +172,8 @@ def auto_browse_path_v2(
*, *,
dry_run: bool = False, dry_run: bool = False,
drop_dirs: Sequence[str] = (), drop_dirs: Sequence[str] = (),
platform_key: Optional[PlatformKey] = None, platform: Optional[str] = None,
platform_instance: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects. """Generate BrowsePathsV2 from Container and BrowsePaths aspects.
@ -248,7 +248,9 @@ def auto_browse_path_v2(
yield MetadataChangeProposalWrapper( yield MetadataChangeProposalWrapper(
entityUrn=urn, entityUrn=urn,
aspect=BrowsePathsV2Class( aspect=BrowsePathsV2Class(
path=_prepend_platform_instance(path, platform_key) path=_prepend_platform_instance(
path, platform, platform_instance
)
), ),
).as_workunit() ).as_workunit()
elif urn not in emitted_urns and guess_entity_type(urn) == "container": elif urn not in emitted_urns and guess_entity_type(urn) == "container":
@ -258,16 +260,14 @@ def auto_browse_path_v2(
yield MetadataChangeProposalWrapper( yield MetadataChangeProposalWrapper(
entityUrn=urn, entityUrn=urn,
aspect=BrowsePathsV2Class( aspect=BrowsePathsV2Class(
path=_prepend_platform_instance([], platform_key) path=_prepend_platform_instance([], platform, platform_instance)
), ),
).as_workunit() ).as_workunit()
if num_out_of_batch or num_out_of_order: if num_out_of_batch or num_out_of_order:
properties = { properties = {
"platform": platform_key.platform if platform_key else None, "platform": platform,
"has_platform_instance": bool(platform_key.instance) "has_platform_instance": bool(platform_instance),
if platform_key
else False,
"num_out_of_batch": num_out_of_batch, "num_out_of_batch": num_out_of_batch,
"num_out_of_order": num_out_of_order, "num_out_of_order": num_out_of_order,
} }
@ -293,12 +293,12 @@ def _batch_workunits_by_urn(
def _prepend_platform_instance( def _prepend_platform_instance(
entries: List[BrowsePathEntryClass], platform_key: Optional[PlatformKey] entries: List[BrowsePathEntryClass],
platform: Optional[str],
platform_instance: Optional[str],
) -> List[BrowsePathEntryClass]: ) -> List[BrowsePathEntryClass]:
if platform_key and platform_key.instance: if platform and platform_instance:
urn = make_dataplatform_instance_urn( urn = make_dataplatform_instance_urn(platform, platform_instance)
platform_key.platform, platform_key.instance
)
return [BrowsePathEntryClass(id=urn, urn=urn)] + entries return [BrowsePathEntryClass(id=urn, urn=urn)] + entries
return entries return entries

View File

@ -8,7 +8,6 @@ from datahub.emitter.mce_builder import (
make_dataset_urn, make_dataset_urn,
) )
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import PlatformKey
from datahub.ingestion.api.source_helpers import ( from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2, auto_browse_path_v2,
auto_status_aspect, auto_status_aspect,
@ -298,7 +297,6 @@ def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mo
def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock): def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock):
platform = "my_platform" platform = "my_platform"
platform_instance = "my_instance" platform_instance = "my_instance"
platform_key = PlatformKey(platform=platform, instance=platform_instance)
platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance) platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance)
platform_instance_entry = models.BrowsePathEntryClass( platform_instance_entry = models.BrowsePathEntryClass(
platform_instance_urn, platform_instance_urn platform_instance_urn, platform_instance_urn
@ -310,7 +308,8 @@ def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock):
new_wus = list( new_wus = list(
auto_browse_path_v2( auto_browse_path_v2(
wus, wus,
platform_key=platform_key, platform=platform,
platform_instance=platform_instance,
) )
) )
assert telemetry_ping_mock.call_count == 0 assert telemetry_ping_mock.call_count == 0