diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index c6ec21f820d..027dc768a47 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -178,26 +178,38 @@ class MetadataChangeProposalWrapper: "value": json.dumps(obj["aspect"]["json"]), } - mcp = MetadataChangeProposalClass.from_obj(obj, tuples=tuples) + mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples) # We don't know how to deserialize the entity key aspects yet. - if mcp.entityKeyAspect is not None: - return mcp + if mcpc.entityKeyAspect is not None: + return mcpc # Try to deserialize the aspect. - converted, aspect = _try_from_generic_aspect(mcp.aspectName, mcp.aspect) + return cls.try_from_mcpc(mcpc) or mcpc + + @classmethod + def try_from_mcpc( + cls, mcpc: MetadataChangeProposalClass + ) -> Optional["MetadataChangeProposalWrapper"]: + """Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass. + Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type. + + Raises: + Exception if the generic aspect is invalid, e.g. contains invalid json. + """ + converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect) if converted: return cls( - entityType=mcp.entityType, - entityUrn=mcp.entityUrn, - changeType=mcp.changeType, - auditHeader=mcp.auditHeader, - aspectName=mcp.aspectName, + entityType=mcpc.entityType, + entityUrn=mcpc.entityUrn, + changeType=mcpc.changeType, + auditHeader=mcpc.auditHeader, + aspectName=mcpc.aspectName, aspect=aspect, - systemMetadata=mcp.systemMetadata, + systemMetadata=mcpc.systemMetadata, ) - - return mcp + else: + return None @classmethod def from_obj_require_wrapper( diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 6b311cb8ade..ad47cc06be4 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,4 +1,6 @@ -from typing import Callable, Iterable, Optional, Set, TypeVar, Union +import logging +from collections import defaultdict +from typing import Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import WorkUnit @@ -8,6 +10,9 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) from datahub.metadata.schema_classes import ( + BrowsePathEntryClass, + BrowsePathsV2Class, + ContainerClass, MetadataChangeEventClass, MetadataChangeProposalClass, StatusClass, @@ -17,6 +22,8 @@ from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type from datahub.utilities.urns.urn_iter import list_urns +logger = logging.getLogger(__name__) + def auto_workunit( stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]] @@ -148,3 +155,50 @@ def auto_materialize_referenced_tags( entityUrn=urn, aspect=TagKeyClass(name=tag_urn.get_entity_id()[0]), ).as_workunit() + + +def auto_browse_path_v2( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """Generate BrowsePathsV2 from Container aspects.""" + # TODO: Generate BrowsePathsV2 from BrowsePaths as well + + ignore_urns: Set[str] = set() + container_urns: Set[str] = set() + parent_container_map: Dict[str, str] = {} + children: Dict[str, List[str]] = defaultdict(list) + for wu in stream: + yield wu + + urn = wu.get_urn() + if guess_entity_type(urn) == "container": + container_urns.add(urn) + + container_aspects = wu.get_aspects_of_type(ContainerClass) + for aspect in container_aspects: + parent = aspect.container + parent_container_map[urn] = parent + children[parent].append(urn) + + if wu.get_aspects_of_type(BrowsePathsV2Class): + ignore_urns.add(urn) + + paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path + # Yield browse paths v2 in topological order, starting with root containers + nodes = container_urns - parent_container_map.keys() + while nodes: + node = nodes.pop() + nodes.update(children[node]) + + if node not in parent_container_map: # root + paths[node] = [] + else: + parent = parent_container_map[node] + paths[node] = [*paths[parent], parent] + if node not in ignore_urns: + yield MetadataChangeProposalWrapper( + entityUrn=node, + aspect=BrowsePathsV2Class( + path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]] + ), + ).as_workunit() diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index 63525a21b4c..3a80b900f14 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Iterable, Optional, Union, overload +from typing import Iterable, List, Optional, Type, TypeVar, Union, overload from deprecated import deprecated @@ -9,7 +9,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, ) -from datahub.metadata.schema_classes import UsageAggregationClass +from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect + +T_Aspect = TypeVar("T_Aspect", bound=_Aspect) @dataclass @@ -88,6 +90,27 @@ class MetadataWorkUnit(WorkUnit): assert self.metadata.entityUrn return self.metadata.entityUrn + def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]: + aspects: list + if isinstance(self.metadata, MetadataChangeEvent): + aspects = self.metadata.proposedSnapshot.aspects + elif isinstance(self.metadata, MetadataChangeProposalWrapper): + aspects = [self.metadata.aspect] + elif isinstance(self.metadata, MetadataChangeProposal): + aspects = [] + # Best effort attempt to deserialize MetadataChangeProposalClass + if self.metadata.aspectName == aspect_cls.ASPECT_NAME: + try: + mcp = MetadataChangeProposalWrapper.try_from_mcpc(self.metadata) + if mcp: + aspects = [mcp.aspect] + except Exception: + pass + else: + raise ValueError(f"Unexpected type {type(self.metadata)}") + + return [a for a in aspects if isinstance(a, aspect_cls)] + def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]: from datahub.emitter.mcp_builder import mcps_from_mce diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index a3b14633c81..a913894ca25 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -1,8 +1,13 @@ -from typing import List, Union +from typing import Any, Dict, Iterable, List, Union import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import make_container_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.source_helpers import auto_status_aspect, auto_workunit +from datahub.ingestion.api.source_helpers import ( + auto_browse_path_v2, + auto_status_aspect, + auto_workunit, +) from datahub.ingestion.api.workunit import MetadataWorkUnit _base_metadata: List[ @@ -75,3 +80,94 @@ def test_auto_status_aspect(): ), ] assert list(auto_status_aspect(initial_wu)) == expected + + +def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]: + for k, v in d.items(): + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(k), + aspect=models.StatusClass(removed=False), + ).as_workunit() + + for child in list(v): + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(child), + aspect=models.ContainerClass( + container=make_container_urn(k), + ), + ).as_workunit() + if isinstance(v, dict): + yield from _create_container_aspects(v) + + +def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]: + return [ + models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s)) + for s in path + ] + + +def _get_browse_paths_from_wu( + stream: Iterable[MetadataWorkUnit], +) -> Dict[str, List[models.BrowsePathEntryClass]]: + paths = {} + for wu in stream: + browse_path_v2 = wu.get_aspects_of_type(models.BrowsePathsV2Class) + if browse_path_v2: + name = wu.get_urn().split(":")[-1] + paths[name] = browse_path_v2[0].path + return paths + + +def test_auto_browse_path_v2(): + structure = { + "one": { + "a": {"i": ["1", "2", "3"], "ii": ["4"]}, + "b": {"iii": ["5", "6"]}, + }, + "two": { + "c": {"iv": [], "v": ["7", "8"]}, + }, + "three": {"d": {}}, + "four": {}, + } + + wus = list(auto_status_aspect(_create_container_aspects(structure))) + assert ( # Sanity check + sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21 + ) + + new_wus = list(auto_browse_path_v2(wus)) + assert ( + sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 21 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["one"] == [] + assert paths["7"] == paths["8"] == _make_browse_path_entries(["two", "c", "v"]) + assert paths["d"] == _make_browse_path_entries(["three"]) + assert paths["i"] == _make_browse_path_entries(["one", "a"]) + + +def test_auto_browse_path_v2_ignores_urns_already_with(): + structure = {"a": {"b": {"c": {"d": ["e"]}}}} + + mcp = MetadataChangeProposalWrapper( + entityUrn=make_container_urn("c"), + aspect=models.BrowsePathsV2Class( + path=_make_browse_path_entries(["custom", "path"]) + ), + ) + wus = [*auto_status_aspect(_create_container_aspects(structure)), mcp.as_workunit()] + + new_wus = list(auto_browse_path_v2(wus)) + assert ( + sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 5 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [] + assert paths["c"] == _make_browse_path_entries(["custom", "path"]) + assert paths["e"] == _make_browse_path_entries(["a", "b", "c", "d"]) diff --git a/metadata-ingestion/tests/unit/test_workunit.py b/metadata-ingestion/tests/unit/test_workunit.py new file mode 100644 index 00000000000..e036acc9d50 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_workunit.py @@ -0,0 +1,109 @@ +import json + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import ( + ChangeTypeClass, + ContainerClass, + DatasetSnapshotClass, + GenericAspectClass, + MetadataChangeEventClass, + MetadataChangeProposalClass, + StatusClass, + UpstreamLineageClass, +) + + +def test_get_aspects_of_type_mcp(): + aspect = StatusClass(False) + wu = MetadataChangeProposalWrapper( + entityUrn="urn:li:container:asdf", aspect=aspect + ).as_workunit() + assert wu.get_aspects_of_type(StatusClass) == [aspect] + assert wu.get_aspects_of_type(ContainerClass) == [] + + +def test_get_aspects_of_type_mce(): + status_aspect = StatusClass(False) + status_aspect_2 = StatusClass(True) + lineage_aspect = UpstreamLineageClass(upstreams=[]) + mce = MetadataChangeEventClass( + proposedSnapshot=DatasetSnapshotClass( + urn="urn:li:dataset:asdf", + aspects=[status_aspect, lineage_aspect, status_aspect_2], + ) + ) + wu = MetadataWorkUnit(id="id", mce=mce) + assert wu.get_aspects_of_type(StatusClass) == [status_aspect, status_aspect_2] + assert wu.get_aspects_of_type(UpstreamLineageClass) == [lineage_aspect] + assert wu.get_aspects_of_type(ContainerClass) == [] + + +def test_get_aspects_of_type_mcpc(): + aspect = StatusClass(False) + mcpc = MetadataChangeProposalClass( + entityUrn="urn:li:container:asdf", + entityType="container", + changeType=ChangeTypeClass.UPSERT, + aspectName=StatusClass.ASPECT_NAME, + aspect=GenericAspectClass( + value=json.dumps(aspect.to_obj()).encode(), + contentType="application/json", + ), + ) + wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) + assert wu.get_aspects_of_type(StatusClass) == [aspect] + assert wu.get_aspects_of_type(ContainerClass) == [] + + # Failure scenarios + mcpc = MetadataChangeProposalClass( + entityUrn="urn:li:container:asdf", + entityType="container", + changeType=ChangeTypeClass.UPSERT, + aspectName="not status", + aspect=GenericAspectClass( + value=json.dumps(aspect.to_obj()).encode(), + contentType="application/json", + ), + ) + wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) + assert wu.get_aspects_of_type(StatusClass) == [] + + mcpc = MetadataChangeProposalClass( + entityUrn="urn:li:container:asdf", + entityType="container", + changeType=ChangeTypeClass.PATCH, + aspectName=StatusClass.ASPECT_NAME, + aspect=GenericAspectClass( + value=json.dumps({"not_status": True}).encode(), + contentType="application/json-patch+json", + ), + ) + wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) + assert wu.get_aspects_of_type(StatusClass) == [] + + mcpc = MetadataChangeProposalClass( + entityUrn="urn:li:container:asdf", + entityType="container", + changeType=ChangeTypeClass.UPSERT, + aspectName=StatusClass.ASPECT_NAME, + aspect=GenericAspectClass( + value=(json.dumps(aspect.to_obj()) + "aaa").encode(), + contentType="application/json", + ), + ) + wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) + assert wu.get_aspects_of_type(StatusClass) == [] + + mcpc = MetadataChangeProposalClass( + entityUrn="urn:li:container:asdf", + entityType="container", + changeType=ChangeTypeClass.UPSERT, + aspectName=StatusClass.ASPECT_NAME, + aspect=GenericAspectClass( + value='{"ß": 2}'.encode("latin_1"), + contentType="application/json", + ), + ) + wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) + assert wu.get_aspects_of_type(StatusClass) == []