feat(ingest): Create Browse Paths V2 under flag (#8120)

This commit is contained in:
Andrew Sikowitz 2023-06-02 15:50:38 -04:00 committed by GitHub
parent fe1ff71318
commit 802c91a0a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 231 additions and 79 deletions

View File

@ -8,7 +8,7 @@ import unittest.mock
from dataclasses import Field, dataclass, field
from enum import auto
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Iterable
import avro.schema
import click
@ -27,7 +27,6 @@ from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
ForeignKeyConstraintClass,
GlobalTagsClass,
MetadataChangeEventClass,
OtherSchemaClass,
SchemaFieldClass as SchemaField,
SchemaFieldDataTypeClass,
@ -35,6 +34,8 @@ from datahub.metadata.schema_classes import (
StringTypeClass,
SubTypesClass,
TagAssociationClass,
BrowsePathsV2Class,
BrowsePathEntryClass,
)
logger = logging.getLogger(__name__)
@ -316,9 +317,10 @@ def make_entity_docs(entity_display_name: str, graph: RelationshipGraph) -> str:
raise Exception(f"Failed to find information for entity: {entity_name}")
def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]:
def generate_stitched_record(
relnships_graph: RelationshipGraph,
) -> Iterable[MetadataChangeProposalWrapper]:
def strip_types(field_path: str) -> str:
final_path = field_path
final_path = re.sub(r"(\[type=[a-zA-Z]+\]\.)", "", final_path)
final_path = re.sub(r"^\[version=2.0\]\.", "", final_path)
@ -455,52 +457,41 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]:
edge_id=f"{entity_display_name}:{fkey.name}:{destination_entity_name}:{strip_types(f_field.fieldPath)}",
)
schemaMetadata = SchemaMetadataClass(
schemaName=f"{entity_name}",
platform=make_data_platform_urn("datahub"),
platformSchema=OtherSchemaClass(rawSchema=rawSchema),
fields=schema_fields,
version=0,
hash="",
foreignKeys=foreign_keys if foreign_keys else None,
dataset_urn = make_dataset_urn(
platform="datahub",
name=entity_display_name,
)
dataset = DatasetSnapshotClass(
urn=make_dataset_urn(
platform="datahub",
name=f"{entity_display_name}",
),
yield from MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[
schemaMetadata,
SchemaMetadataClass(
schemaName=str(entity_name),
platform=make_data_platform_urn("datahub"),
platformSchema=OtherSchemaClass(rawSchema=rawSchema),
fields=schema_fields,
version=0,
hash="",
foreignKeys=foreign_keys if foreign_keys else None,
),
GlobalTagsClass(
tags=[TagAssociationClass(tag="urn:li:tag:Entity")]
),
BrowsePathsClass([f"/prod/datahub/entities/{entity_display_name}"]),
BrowsePathsV2Class(
[
BrowsePathEntryClass(id="entities"),
BrowsePathEntryClass(id=entity_display_name),
]
),
DatasetPropertiesClass(
description=make_entity_docs(
dataset_urn.split(":")[-1].split(",")[1], relnships_graph
)
),
SubTypesClass(typeNames=["entity"]),
],
)
datasets.append(dataset)
events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]] = []
for d in datasets:
entity_name = d.urn.split(":")[-1].split(",")[1]
d.aspects.append(
DatasetPropertiesClass(
description=make_entity_docs(entity_name, relnships_graph)
)
)
mce = MetadataChangeEventClass(
proposedSnapshot=d,
)
events.append(mce)
mcp = MetadataChangeProposalWrapper(
entityUrn=d.urn,
aspect=SubTypesClass(typeNames=["entity"]),
)
events.append(mcp)
return events
class EntityRegistry(ConfigModel):
@ -614,7 +605,7 @@ def generate(
]
relationship_graph = RelationshipGraph()
events = generate_stitched_record(relationship_graph)
mcps = generate_stitched_record(relationship_graph)
shutil.rmtree(f"{generated_docs_dir}/entities", ignore_errors=True)
entity_names = [(x, entity_registry[x]) for x in generated_documentation]
@ -645,7 +636,7 @@ def generate(
PipelineContext(run_id="generated-metaModel"),
FileSinkConfig(filename=file),
)
for e in events:
for e in mcps:
fileSink.write_record_async(
RecordEnvelope(e, metadata={}), write_callback=NoopWriteCallback()
)
@ -674,7 +665,7 @@ def generate(
assert server.startswith("http://"), "server address must start with http://"
emitter = DatahubRestEmitter(gms_server=server)
emitter.test_connection()
for e in events:
for e in mcps:
emitter.emit(e)
if dot:

View File

@ -27,6 +27,7 @@ from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
@ -181,10 +182,31 @@ class Source(Closeable, metaclass=ABCMeta):
"""A list of functions that transforms the workunits produced by this source.
Run in order, first in list is applied first. Be careful with order when overriding.
"""
browse_path_processor: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.generate_browse_path_v2
):
platform = getattr(self, "platform", None) or getattr(
self.get_config(), "platform", None
)
env = getattr(self.get_config(), "env", None)
browse_path_drop_dirs = [
platform,
platform and platform.lower(),
env,
env and env.lower(),
]
browse_path_processor = partial(
auto_browse_path_v2,
[s for s in browse_path_drop_dirs if s is not None],
)
return [
auto_status_aspect,
auto_materialize_referenced_tags,
partial(auto_workunit_reporter, self.get_report()),
browse_path_processor,
]
@staticmethod
@ -207,6 +229,18 @@ class Source(Closeable, metaclass=ABCMeta):
"get_workunits_internal must be implemented if get_workunits is not overriden."
)
def get_config(self) -> Optional[ConfigModel]:
"""Overridable method to return the config object for this source.
Enables defining workunit processors in this class, rather than per source.
More broadly, this method contributes to the standardization of sources,
to promote more source-generic functionality.
Eventually, would like to replace this call with a Protocol that requires
a config object to be defined on each source.
"""
return getattr(self, "config", None) or getattr(self, "source_config", None)
@abstractmethod
def get_report(self) -> SourceReport:
pass

View File

@ -7,6 +7,7 @@ from typing import (
Iterable,
List,
Optional,
Sequence,
Set,
TypeVar,
Union,
@ -16,6 +17,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsClass,
BrowsePathsV2Class,
ContainerClass,
MetadataChangeEventClass,
@ -164,12 +166,13 @@ def auto_materialize_referenced_tags(
def auto_browse_path_v2(
drop_dirs: Sequence[str],
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container aspects."""
# TODO: Generate BrowsePathsV2 from BrowsePaths as well
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects."""
ignore_urns: Set[str] = set()
legacy_browse_paths: Dict[str, List[str]] = defaultdict(list)
container_urns: Set[str] = set()
parent_container_map: Dict[str, str] = {}
children: Dict[str, List[str]] = defaultdict(list)
@ -181,16 +184,25 @@ def auto_browse_path_v2(
container_urns.add(urn)
container_aspects = wu.get_aspects_of_type(ContainerClass)
for aspect in container_aspects:
parent = aspect.container
for c_aspect in container_aspects:
parent = c_aspect.container
parent_container_map[urn] = parent
children[parent].append(urn)
browse_path_aspects = wu.get_aspects_of_type(BrowsePathsClass)
for b_aspect in browse_path_aspects:
if b_aspect.paths:
path = b_aspect.paths[0] # Only take first path
legacy_browse_paths[urn] = [
p for p in path.strip("/").split("/") if p.strip() not in drop_dirs
]
if wu.get_aspects_of_type(BrowsePathsV2Class):
ignore_urns.add(urn)
paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path
# Yield browse paths v2 in topological order, starting with root containers
processed_urns = set()
nodes = container_urns - parent_container_map.keys()
while nodes:
node = nodes.pop()
@ -208,3 +220,14 @@ def auto_browse_path_v2(
path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]]
),
).as_workunit()
processed_urns.add(node)
# Yield browse paths v2 based on browse paths v1 (legacy)
# Only done if the entity is not part of a container hierarchy
for urn in legacy_browse_paths.keys() - processed_urns - ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=p) for p in legacy_browse_paths[urn]]
),
).as_workunit()

View File

@ -37,6 +37,19 @@ class FailureLoggingConfig(ConfigModel):
log_config: Optional[FileSinkConfig] = None
class FlagsConfig(ConfigModel):
"""Experimental flags for the ingestion pipeline.
As ingestion flags an experimental feature, we do not guarantee backwards compatibility.
Use at your own risk!
"""
generate_browse_path_v2: bool = Field(
default=False,
description="Generate BrowsePathsV2 aspects from container hierarchy and existing BrowsePaths aspects.",
)
class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
# simplify this configuration and validation.
@ -45,6 +58,7 @@ class PipelineConfig(ConfigModel):
source: SourceConfig
sink: DynamicTypedConfig
transformers: Optional[List[DynamicTypedConfig]]
flags: FlagsConfig = Field(default=FlagsConfig())
reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DatahubClientConfig] = None

View File

@ -41,6 +41,7 @@ class SagemakerSource(Source):
- Models, jobs, and lineage between the two (e.g. when jobs output a model or a model is used by a job)
"""
platform = "sagemaker"
source_config: SagemakerSourceConfig
report = SagemakerSourceReport()

View File

@ -116,6 +116,7 @@ class FeastRepositorySource(Source):
- Column types associated with each entity and feature
"""
platform = "feast"
source_config: FeastRepositorySourceConfig
report: SourceReport
feature_store: FeatureStore

View File

@ -13,7 +13,7 @@ from okta.models import Group, GroupProfile, User, UserProfile, UserStatus
from pydantic import validator
from pydantic.fields import Field
from datahub.configuration.common import ConfigurationError
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
@ -53,7 +53,7 @@ from datahub.metadata.schema_classes import (
logger = logging.getLogger(__name__)
class OktaConfig(StatefulIngestionConfigBase):
class OktaConfig(StatefulIngestionConfigBase, ConfigModel):
# Required: Domain of the Okta deployment. Example: dev-33231928.okta.com
okta_domain: str = Field(
description="The location of your Okta Domain, without a protocol. Can be found in Okta Developer console. e.g. dev-33231928.okta.com",

View File

@ -160,7 +160,7 @@ class ModeSource(Source):
config: ModeConfig
report: SourceReport
tool = "mode"
platform = "mode"
def __hash__(self):
return id(self)
@ -200,7 +200,9 @@ class ModeSource(Source):
self, space_name: str, report_info: dict
) -> DashboardSnapshot:
report_token = report_info.get("token", "")
dashboard_urn = builder.make_dashboard_urn(self.tool, report_info.get("id", ""))
dashboard_urn = builder.make_dashboard_urn(
self.platform, report_info.get("id", "")
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
@ -304,7 +306,9 @@ class ModeSource(Source):
charts = self._get_charts(report_token, query.get("token", ""))
# build chart urns
for chart in charts:
chart_urn = builder.make_chart_urn(self.tool, chart.get("token", ""))
chart_urn = builder.make_chart_urn(
self.platform, chart.get("token", "")
)
chart_urns.append(chart_urn)
return chart_urns
@ -580,7 +584,7 @@ class ModeSource(Source):
def construct_chart_from_api_data(
self, chart_data: dict, query: dict, path: str
) -> ChartSnapshot:
chart_urn = builder.make_chart_urn(self.tool, chart_data.get("token", ""))
chart_urn = builder.make_chart_urn(self.platform, chart_data.get("token", ""))
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[],

View File

@ -8,6 +8,7 @@ import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field
from datahub.configuration import ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
@ -69,7 +70,7 @@ chart_type_from_viz_type = {
}
class SupersetConfig(StatefulIngestionConfigBase):
class SupersetConfig(StatefulIngestionConfigBase, ConfigModel):
# See the Superset /security/login endpoint for details
# https://superset.apache.org/docs/rest-api
connect_uri: str = Field(

View File

@ -332,7 +332,8 @@ class AddStatusRemovedTransformer(Transformer):
class FakeSource(Source):
def __init__(self):
def __init__(self, ctx: PipelineContext):
super().__init__(ctx)
self.source_report = SourceReport()
self.work_units: List[MetadataWorkUnit] = [
MetadataWorkUnit(id="workunit-1", mce=get_initial_mce())
@ -341,7 +342,7 @@ class FakeSource(Source):
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
assert not config_dict
return cls()
return cls(ctx)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return self.work_units
@ -354,8 +355,8 @@ class FakeSource(Source):
class FakeSourceWithWarnings(FakeSource):
def __init__(self):
super().__init__()
def __init__(self, ctx: PipelineContext):
super().__init__(ctx)
self.source_report.report_warning("test_warning", "warning_text")
def get_report(self) -> SourceReport:
@ -363,8 +364,8 @@ class FakeSourceWithWarnings(FakeSource):
class FakeSourceWithFailures(FakeSource):
def __init__(self):
super().__init__()
def __init__(self, ctx: PipelineContext):
super().__init__(ctx)
self.source_report.report_failure("test_failure", "failure_text")
def get_report(self) -> SourceReport:

View File

@ -27,6 +27,7 @@ class FakeSource(Source):
]
def __init__(self, ctx: PipelineContext):
super().__init__(ctx)
self.source_report = SourceReport()
@classmethod

View File

@ -1,7 +1,7 @@
from typing import Any, Dict, Iterable, List, Union
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_container_urn
from datahub.emitter.mce_builder import make_container_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
@ -100,13 +100,19 @@ def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
yield from _create_container_aspects(v)
def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]:
def _make_container_browse_path_entries(
path: List[str],
) -> List[models.BrowsePathEntryClass]:
return [
models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s))
for s in path
]
def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]:
return [models.BrowsePathEntryClass(id=s, urn=None) for s in path]
def _get_browse_paths_from_wu(
stream: Iterable[MetadataWorkUnit],
) -> Dict[str, List[models.BrowsePathEntryClass]]:
@ -119,7 +125,7 @@ def _get_browse_paths_from_wu(
return paths
def test_auto_browse_path_v2():
def test_auto_browse_path_v2_by_container_hierarchy():
structure = {
"one": {
"a": {"i": ["1", "2", "3"], "ii": ["4"]},
@ -137,7 +143,7 @@ def test_auto_browse_path_v2():
sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21
)
new_wus = list(auto_browse_path_v2(wus))
new_wus = list(auto_browse_path_v2([], wus))
assert (
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 21
@ -145,29 +151,104 @@ def test_auto_browse_path_v2():
paths = _get_browse_paths_from_wu(new_wus)
assert paths["one"] == []
assert paths["7"] == paths["8"] == _make_browse_path_entries(["two", "c", "v"])
assert paths["d"] == _make_browse_path_entries(["three"])
assert paths["i"] == _make_browse_path_entries(["one", "a"])
assert (
paths["7"]
== paths["8"]
== _make_container_browse_path_entries(["two", "c", "v"])
)
assert paths["d"] == _make_container_browse_path_entries(["three"])
assert paths["i"] == _make_container_browse_path_entries(["one", "a"])
def test_auto_browse_path_v2_ignores_urns_already_with():
structure = {"a": {"b": {"c": {"d": ["e"]}}}}
mcp = MetadataChangeProposalWrapper(
entityUrn=make_container_urn("c"),
aspect=models.BrowsePathsV2Class(
path=_make_browse_path_entries(["custom", "path"])
mcps = [
*MetadataChangeProposalWrapper.construct_many(
entityUrn=make_container_urn("f"),
aspects=[
models.BrowsePathsClass(paths=["/one/two"]),
models.BrowsePathsV2Class(
path=_make_browse_path_entries(["my", "path"])
),
],
),
)
wus = [*auto_status_aspect(_create_container_aspects(structure)), mcp.as_workunit()]
new_wus = list(auto_browse_path_v2(wus))
MetadataChangeProposalWrapper(
entityUrn=make_container_urn("c"),
aspect=models.BrowsePathsV2Class(
path=_make_container_browse_path_entries(["custom", "path"])
),
),
]
wus = [
*auto_status_aspect(
[
*_create_container_aspects(structure),
*(mcp.as_workunit() for mcp in mcps),
]
)
]
new_wus = list(auto_browse_path_v2([], wus))
assert (
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 5
== 6
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == []
assert paths["c"] == _make_browse_path_entries(["custom", "path"])
assert paths["e"] == _make_browse_path_entries(["a", "b", "c", "d"])
assert paths["c"] == _make_container_browse_path_entries(["custom", "path"])
assert paths["e"] == _make_container_browse_path_entries(["a", "b", "c", "d"])
assert paths["f"] == _make_browse_path_entries(["my", "path"])
def test_auto_browse_path_v2_legacy_browse_path():
platform = "platform"
env = "PROD"
wus = [
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-1", env),
aspect=models.BrowsePathsClass(["/one/two"]),
).as_workunit(),
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-2", env),
aspect=models.BrowsePathsClass([f"/{platform}/{env}/something"]),
).as_workunit(),
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-3", env),
aspect=models.BrowsePathsClass([f"/{platform}/one/two"]),
).as_workunit(),
]
new_wus = list(auto_browse_path_v2(["platform", "PROD", "unused"], wus))
assert len(new_wus) == 6
paths = _get_browse_paths_from_wu(new_wus)
assert (
paths["platform,dataset-1,PROD)"]
== paths["platform,dataset-3,PROD)"]
== _make_browse_path_entries(["one", "two"])
)
assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"])
def test_auto_browse_path_v2_container_over_legacy_browse_path():
structure = {"a": {"b": ["c"]}}
wus = list(
auto_status_aspect(
[
*_create_container_aspects(structure),
MetadataChangeProposalWrapper(
entityUrn=make_container_urn("b"),
aspect=models.BrowsePathsClass(paths=["/one/two"]),
).as_workunit(),
]
)
)
new_wus = list(auto_browse_path_v2([], wus))
assert (
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 3
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == []
assert paths["b"] == _make_container_browse_path_entries(["a"])
assert paths["c"] == _make_container_browse_path_entries(["a", "b"])