feat(ingest): Browse Path v2 helper (#8012)

This commit is contained in:
Andrew Sikowitz 2023-05-24 02:46:46 -04:00 committed by GitHub
parent 287a292fe7
commit 8357fc8d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 311 additions and 17 deletions

View File

@ -178,26 +178,38 @@ class MetadataChangeProposalWrapper:
"value": json.dumps(obj["aspect"]["json"]),
}
mcp = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)
mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)
# We don't know how to deserialize the entity key aspects yet.
if mcp.entityKeyAspect is not None:
return mcp
if mcpc.entityKeyAspect is not None:
return mcpc
# Try to deserialize the aspect.
converted, aspect = _try_from_generic_aspect(mcp.aspectName, mcp.aspect)
return cls.try_from_mcpc(mcpc) or mcpc
@classmethod
def try_from_mcpc(
cls, mcpc: MetadataChangeProposalClass
) -> Optional["MetadataChangeProposalWrapper"]:
"""Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass.
Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type.
Raises:
Exception if the generic aspect is invalid, e.g. contains invalid json.
"""
converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
if converted:
return cls(
entityType=mcp.entityType,
entityUrn=mcp.entityUrn,
changeType=mcp.changeType,
auditHeader=mcp.auditHeader,
aspectName=mcp.aspectName,
entityType=mcpc.entityType,
entityUrn=mcpc.entityUrn,
changeType=mcpc.changeType,
auditHeader=mcpc.auditHeader,
aspectName=mcpc.aspectName,
aspect=aspect,
systemMetadata=mcp.systemMetadata,
systemMetadata=mcpc.systemMetadata,
)
return mcp
else:
return None
@classmethod
def from_obj_require_wrapper(

View File

@ -1,4 +1,6 @@
from typing import Callable, Iterable, Optional, Set, TypeVar, Union
import logging
from collections import defaultdict
from typing import Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
@ -8,6 +10,9 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
ContainerClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
@ -17,6 +22,8 @@ from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
logger = logging.getLogger(__name__)
def auto_workunit(
stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]]
@ -148,3 +155,50 @@ def auto_materialize_referenced_tags(
entityUrn=urn,
aspect=TagKeyClass(name=tag_urn.get_entity_id()[0]),
).as_workunit()
def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container aspects."""
# TODO: Generate BrowsePathsV2 from BrowsePaths as well
ignore_urns: Set[str] = set()
container_urns: Set[str] = set()
parent_container_map: Dict[str, str] = {}
children: Dict[str, List[str]] = defaultdict(list)
for wu in stream:
yield wu
urn = wu.get_urn()
if guess_entity_type(urn) == "container":
container_urns.add(urn)
container_aspects = wu.get_aspects_of_type(ContainerClass)
for aspect in container_aspects:
parent = aspect.container
parent_container_map[urn] = parent
children[parent].append(urn)
if wu.get_aspects_of_type(BrowsePathsV2Class):
ignore_urns.add(urn)
paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path
# Yield browse paths v2 in topological order, starting with root containers
nodes = container_urns - parent_container_map.keys()
while nodes:
node = nodes.pop()
nodes.update(children[node])
if node not in parent_container_map: # root
paths[node] = []
else:
parent = parent_container_map[node]
paths[node] = [*paths[parent], parent]
if node not in ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=node,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]]
),
).as_workunit()

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Iterable, Optional, Union, overload
from typing import Iterable, List, Optional, Type, TypeVar, Union, overload
from deprecated import deprecated
@ -9,7 +9,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import UsageAggregationClass
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect
T_Aspect = TypeVar("T_Aspect", bound=_Aspect)
@dataclass
@ -88,6 +90,27 @@ class MetadataWorkUnit(WorkUnit):
assert self.metadata.entityUrn
return self.metadata.entityUrn
def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
aspects: list
if isinstance(self.metadata, MetadataChangeEvent):
aspects = self.metadata.proposedSnapshot.aspects
elif isinstance(self.metadata, MetadataChangeProposalWrapper):
aspects = [self.metadata.aspect]
elif isinstance(self.metadata, MetadataChangeProposal):
aspects = []
# Best effort attempt to deserialize MetadataChangeProposalClass
if self.metadata.aspectName == aspect_cls.ASPECT_NAME:
try:
mcp = MetadataChangeProposalWrapper.try_from_mcpc(self.metadata)
if mcp:
aspects = [mcp.aspect]
except Exception:
pass
else:
raise ValueError(f"Unexpected type {type(self.metadata)}")
return [a for a in aspects if isinstance(a, aspect_cls)]
def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce

View File

@ -1,8 +1,13 @@
from typing import List, Union
from typing import Any, Dict, Iterable, List, Union
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_container_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_status_aspect, auto_workunit
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_status_aspect,
auto_workunit,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
_base_metadata: List[
@ -75,3 +80,94 @@ def test_auto_status_aspect():
),
]
assert list(auto_status_aspect(initial_wu)) == expected
def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
for k, v in d.items():
yield MetadataChangeProposalWrapper(
entityUrn=make_container_urn(k),
aspect=models.StatusClass(removed=False),
).as_workunit()
for child in list(v):
yield MetadataChangeProposalWrapper(
entityUrn=make_container_urn(child),
aspect=models.ContainerClass(
container=make_container_urn(k),
),
).as_workunit()
if isinstance(v, dict):
yield from _create_container_aspects(v)
def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]:
return [
models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s))
for s in path
]
def _get_browse_paths_from_wu(
stream: Iterable[MetadataWorkUnit],
) -> Dict[str, List[models.BrowsePathEntryClass]]:
paths = {}
for wu in stream:
browse_path_v2 = wu.get_aspects_of_type(models.BrowsePathsV2Class)
if browse_path_v2:
name = wu.get_urn().split(":")[-1]
paths[name] = browse_path_v2[0].path
return paths
def test_auto_browse_path_v2():
structure = {
"one": {
"a": {"i": ["1", "2", "3"], "ii": ["4"]},
"b": {"iii": ["5", "6"]},
},
"two": {
"c": {"iv": [], "v": ["7", "8"]},
},
"three": {"d": {}},
"four": {},
}
wus = list(auto_status_aspect(_create_container_aspects(structure)))
assert ( # Sanity check
sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21
)
new_wus = list(auto_browse_path_v2(wus))
assert (
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 21
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["one"] == []
assert paths["7"] == paths["8"] == _make_browse_path_entries(["two", "c", "v"])
assert paths["d"] == _make_browse_path_entries(["three"])
assert paths["i"] == _make_browse_path_entries(["one", "a"])
def test_auto_browse_path_v2_ignores_urns_already_with():
structure = {"a": {"b": {"c": {"d": ["e"]}}}}
mcp = MetadataChangeProposalWrapper(
entityUrn=make_container_urn("c"),
aspect=models.BrowsePathsV2Class(
path=_make_browse_path_entries(["custom", "path"])
),
)
wus = [*auto_status_aspect(_create_container_aspects(structure)), mcp.as_workunit()]
new_wus = list(auto_browse_path_v2(wus))
assert (
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 5
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == []
assert paths["c"] == _make_browse_path_entries(["custom", "path"])
assert paths["e"] == _make_browse_path_entries(["a", "b", "c", "d"])

View File

@ -0,0 +1,109 @@
import json
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
ChangeTypeClass,
ContainerClass,
DatasetSnapshotClass,
GenericAspectClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
UpstreamLineageClass,
)
def test_get_aspects_of_type_mcp():
aspect = StatusClass(False)
wu = MetadataChangeProposalWrapper(
entityUrn="urn:li:container:asdf", aspect=aspect
).as_workunit()
assert wu.get_aspects_of_type(StatusClass) == [aspect]
assert wu.get_aspects_of_type(ContainerClass) == []
def test_get_aspects_of_type_mce():
status_aspect = StatusClass(False)
status_aspect_2 = StatusClass(True)
lineage_aspect = UpstreamLineageClass(upstreams=[])
mce = MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn="urn:li:dataset:asdf",
aspects=[status_aspect, lineage_aspect, status_aspect_2],
)
)
wu = MetadataWorkUnit(id="id", mce=mce)
assert wu.get_aspects_of_type(StatusClass) == [status_aspect, status_aspect_2]
assert wu.get_aspects_of_type(UpstreamLineageClass) == [lineage_aspect]
assert wu.get_aspects_of_type(ContainerClass) == []
def test_get_aspects_of_type_mcpc():
aspect = StatusClass(False)
mcpc = MetadataChangeProposalClass(
entityUrn="urn:li:container:asdf",
entityType="container",
changeType=ChangeTypeClass.UPSERT,
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=json.dumps(aspect.to_obj()).encode(),
contentType="application/json",
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
assert wu.get_aspects_of_type(StatusClass) == [aspect]
assert wu.get_aspects_of_type(ContainerClass) == []
# Failure scenarios
mcpc = MetadataChangeProposalClass(
entityUrn="urn:li:container:asdf",
entityType="container",
changeType=ChangeTypeClass.UPSERT,
aspectName="not status",
aspect=GenericAspectClass(
value=json.dumps(aspect.to_obj()).encode(),
contentType="application/json",
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
assert wu.get_aspects_of_type(StatusClass) == []
mcpc = MetadataChangeProposalClass(
entityUrn="urn:li:container:asdf",
entityType="container",
changeType=ChangeTypeClass.PATCH,
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=json.dumps({"not_status": True}).encode(),
contentType="application/json-patch+json",
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
assert wu.get_aspects_of_type(StatusClass) == []
mcpc = MetadataChangeProposalClass(
entityUrn="urn:li:container:asdf",
entityType="container",
changeType=ChangeTypeClass.UPSERT,
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=(json.dumps(aspect.to_obj()) + "aaa").encode(),
contentType="application/json",
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
assert wu.get_aspects_of_type(StatusClass) == []
mcpc = MetadataChangeProposalClass(
entityUrn="urn:li:container:asdf",
entityType="container",
changeType=ChangeTypeClass.UPSERT,
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value='{"ß": 2}'.encode("latin_1"),
contentType="application/json",
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
assert wu.get_aspects_of_type(StatusClass) == []