feat(ingest): read-modify-write capabilities during ingestion, adding ownership patch transformer (#3506)

This commit is contained in:
Swaroop Jagadish 2021-11-03 21:39:52 -07:00 committed by GitHub
parent 192f0d33a2
commit ec406c7928
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 391 additions and 22 deletions

View File

@ -1,11 +1,4 @@
# see https://datahubproject.io/docs/metadata-ingestion/transformers for original tutorial
from datahub.configuration.common import ConfigModel
class AddCustomOwnershipConfig(ConfigModel):
owners_json: str
import json
from typing import Iterable
@ -13,6 +6,7 @@ import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.add_dataset_ownership import Semantics
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
@ -22,6 +16,11 @@ from datahub.metadata.schema_classes import (
)
class AddCustomOwnershipConfig(ConfigModel):
owners_json: str
semantics: Semantics = Semantics.OVERWRITE
class AddCustomOwnership(Transformer):
"""Transformer that adds owners to datasets according to a callback function."""

View File

@ -2,7 +2,7 @@
import logging
import re
import time
from typing import List, Optional, Type, TypeVar, cast, get_type_hints
from typing import Any, List, Optional, Type, TypeVar, cast, get_type_hints
import typing_inspect
from avrogen.dict_wrapper import DictWrapper
@ -210,9 +210,33 @@ def get_aspect_if_available(
return None
def remove_aspect_if_available(
mce: MetadataChangeEventClass, aspect_type: Type[Aspect]
) -> bool:
assert can_add_aspect(mce, aspect_type)
# loose type annotations since we checked before
aspects: List[Any] = [
aspect
for aspect in mce.proposedSnapshot.aspects
if not isinstance(aspect, aspect_type)
]
removed = len(aspects) != len(mce.proposedSnapshot.aspects)
mce.proposedSnapshot.aspects = aspects
return removed
def get_or_add_aspect(mce: MetadataChangeEventClass, default: Aspect) -> Aspect:
existing = get_aspect_if_available(mce, type(default))
if existing is not None:
return existing
mce.proposedSnapshot.aspects.append(default) # type: ignore
return default
def set_aspect(
mce: MetadataChangeEventClass, aspect: Optional[Aspect], aspect_type: Type[Aspect]
) -> None:
"""Sets the aspect to the provided aspect, overwriting any previous aspect value that might have existed before. If passed in aspect is None, then the existing aspect value will be removed"""
remove_aspect_if_available(mce, aspect_type)
if aspect is not None:
mce.proposedSnapshot.aspects.append(aspect) # type: ignore

View File

@ -1,6 +1,8 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Generic, TypeVar
from typing import Generic, Optional, TypeVar
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
T = TypeVar("T")
@ -25,6 +27,12 @@ class WorkUnit(_WorkUnitId, metaclass=ABCMeta):
pass
@dataclass
class PipelineContext:
run_id: str
graph: Optional[DataHubGraph]
def __init__(
self, run_id: str, datahub_api: Optional[DatahubClientConfig] = None
) -> None:
self.run_id = run_id
self.graph = DataHubGraph(datahub_api) if datahub_api is not None else None

View File

@ -0,0 +1,85 @@
import urllib.parse
from json.decoder import JSONDecodeError
from typing import Dict, Optional, Type, TypeVar
from avrogen.dict_wrapper import DictWrapper
from requests.models import HTTPError
from requests.sessions import Session
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import OwnershipClass
# This bound isn't tight, but it's better than nothing.
Aspect = TypeVar("Aspect", bound=DictWrapper)
class DatahubClientConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""
server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
extra_headers: Optional[Dict[str, str]]
max_threads: int = 1
class DataHubGraph(DatahubRestEmitter):
def __init__(self, config: DatahubClientConfig) -> None:
self.config = config
super().__init__(
gms_server=self.config.server,
token=self.config.token,
connect_timeout_sec=self.config.timeout_sec, # reuse timeout_sec for connect timeout
read_timeout_sec=self.config.timeout_sec,
extra_headers=self.config.extra_headers,
)
self.test_connection()
self.g_session = Session()
def _get_generic(self, url: str) -> Dict:
try:
response = self.g_session.get(url)
response.raise_for_status()
return response.json()
except HTTPError as e:
try:
info = response.json()
raise OperationalError(
"Unable to get metadata from DataHub", info
) from e
except JSONDecodeError:
# If we can't parse the JSON, just raise the original error.
raise OperationalError(
"Unable to get metadata from DataHub", {"message": str(e)}
) from e
def get_aspect(
self,
entity_urn: str,
aspect: str,
aspect_type_name: str,
aspect_type: Type[Aspect],
) -> Optional[Aspect]:
url = f"{self._gms_server}/aspects/{urllib.parse.quote(entity_urn)}?aspect={aspect}&version=0"
response = self.g_session.get(url)
if response.status_code == 404:
# not found
return None
response.raise_for_status()
response_json = response.json()
aspect_json = response_json.get("aspect", {}).get(aspect_type_name)
if aspect_json:
return aspect_type.from_obj(aspect_json, tuples=True)
else:
raise OperationalError(
f"Failed to find {aspect_type_name} in response {response_json}"
)
def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
return self.get_aspect(
entity_urn=entity_urn,
aspect="ownership",
aspect_type_name="com.linkedin.common.Ownership",
aspect_type=OwnershipClass,
)

View File

@ -16,6 +16,7 @@ from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.extractor.extractor_registry import extractor_registry
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.transform_registry import transform_registry
@ -36,6 +37,7 @@ class PipelineConfig(ConfigModel):
sink: DynamicTypedConfig
transformers: Optional[List[DynamicTypedConfig]]
run_id: str = "__DEFAULT_RUN_ID"
datahub_api: Optional[DatahubClientConfig] = None
@validator("run_id", pre=True, always=True)
def run_id_should_be_semantic(
@ -53,6 +55,18 @@ class PipelineConfig(ConfigModel):
assert v is not None
return v
@validator("datahub_api", always=True)
def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
if v is None:
if values["sink"].type is not None:
sink_type = values["sink"].type
if sink_type == "datahub-rest":
sink_config = values["sink"].config
v = DatahubClientConfig.parse_obj(sink_config)
return v
class LoggingCallback(WriteCallback):
def on_success(
@ -81,7 +95,9 @@ class Pipeline:
def __init__(self, config: PipelineConfig):
self.config = config
self.ctx = PipelineContext(run_id=self.config.run_id)
self.ctx = PipelineContext(
run_id=self.config.run_id, datahub_api=self.config.datahub_api
)
source_type = self.config.source.type
source_class = source_registry.get(source_type)

View File

@ -2,14 +2,15 @@ import concurrent.futures
import functools
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Union, cast
from typing import Union, cast
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.configuration.common import OperationalError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
@ -19,14 +20,8 @@ from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
logger = logging.getLogger(__name__)
class DatahubRestSinkConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""
server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
extra_headers: Optional[Dict[str, str]]
max_threads: int = 1
class DatahubRestSinkConfig(DatahubClientConfig):
pass
@dataclass

View File

@ -1,9 +1,17 @@
from enum import Enum
from typing import Callable, List, Optional, Union
from pydantic import validator
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, KeyValuePattern
from datahub.configuration.common import (
ConfigModel,
ConfigurationError,
KeyValuePattern,
)
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
@ -14,6 +22,13 @@ from datahub.metadata.schema_classes import (
)
class Semantics(Enum):
"""Describes semantics for ownership changes"""
OVERWRITE = "OVERWRITE" # Apply changes blindly
PATCH = "PATCH" # Only apply differences from what exists already on the server
class AddDatasetOwnershipConfig(ConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
@ -22,9 +37,16 @@ class AddDatasetOwnershipConfig(ConfigModel):
Callable[[DatasetSnapshotClass], List[OwnerClass]],
]
default_actor: str = builder.make_user_urn("etl")
semantics: Semantics = Semantics.OVERWRITE
_resolve_owner_fn = pydantic_resolve_key("get_owners_to_add")
@validator("semantics", pre=True)
def ensure_semantics_is_upper_case(cls, v):
if isinstance(v, str):
return v.upper()
return v
class AddDatasetOwnership(DatasetTransformer):
"""Transformer that adds owners to datasets according to a callback function."""
@ -35,12 +57,53 @@ class AddDatasetOwnership(DatasetTransformer):
def __init__(self, config: AddDatasetOwnershipConfig, ctx: PipelineContext):
self.ctx = ctx
self.config = config
if self.config.semantics == Semantics.PATCH and self.ctx.graph is None:
raise ConfigurationError(
"With PATCH semantics, AddDatasetOwnership requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe"
)
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership":
config = AddDatasetOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)
@staticmethod
def get_ownership_to_set(
graph: DataHubGraph, urn: str, mce_ownership: Optional[OwnershipClass]
) -> Optional[OwnershipClass]:
if not mce_ownership or not mce_ownership.owners:
# nothing to add, no need to consult server
return None
assert mce_ownership
server_ownership = graph.get_ownership(entity_urn=urn)
if server_ownership:
# compute patch
# we only include owners who are not present in the server ownership
# if owner ids match, but the ownership type differs, we prefer the transformers opinion
owners_to_add: List[OwnerClass] = []
needs_update = False
server_owner_ids = [o.owner for o in server_ownership.owners]
for owner in mce_ownership.owners:
if owner.owner not in server_owner_ids:
owners_to_add.append(owner)
else:
# we need to check if the type matches, and if it doesn't, update it
for server_owner in server_ownership.owners:
if (
owner.owner == server_owner.owner
and owner.type != server_owner.type
):
server_owner.type = owner.type
needs_update = True
if owners_to_add or needs_update:
mce_ownership.owners = server_ownership.owners + owners_to_add
return mce_ownership
else:
return None
else:
return mce_ownership
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
@ -54,6 +117,14 @@ class AddDatasetOwnership(DatasetTransformer):
)
ownership.owners.extend(owners_to_add)
if self.config.semantics == Semantics.PATCH:
assert self.ctx.graph
patch_ownership = AddDatasetOwnership.get_ownership_to_set(
self.ctx.graph, mce.proposedSnapshot.urn, ownership
)
builder.set_aspect(
mce, aspect=patch_ownership, aspect_type=OwnershipClass
)
return mce
@ -64,6 +135,13 @@ class DatasetOwnershipBaseConfig(ConfigModel):
class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
owner_urns: List[str]
default_actor: str = builder.make_user_urn("etl")
semantics: Semantics = Semantics.OVERWRITE
@validator("semantics", pre=True)
def upper_case_semantics(cls, v):
if isinstance(v, str):
return v.upper()
return v
class SimpleAddDatasetOwnership(AddDatasetOwnership):
@ -82,6 +160,7 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership):
generic_config = AddDatasetOwnershipConfig(
get_owners_to_add=lambda _: owners,
default_actor=config.default_actor,
semantics=config.semantics,
)
super().__init__(generic_config, ctx)

View File

@ -1,3 +1,6 @@
from typing import List, Union
from unittest import mock
import pytest
import datahub.emitter.mce_builder as builder
@ -7,6 +10,7 @@ from datahub.ingestion.transformer.add_dataset_browse_path import (
AddDatasetBrowsePathTransformer,
)
from datahub.ingestion.transformer.add_dataset_ownership import (
AddDatasetOwnership,
PatternAddDatasetOwnership,
SimpleAddDatasetOwnership,
)
@ -434,3 +438,108 @@ def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time):
},
PipelineContext(run_id="test"),
)
def gen_owners(
owners: List[str],
ownership_type: Union[
str, models.OwnershipTypeClass
] = models.OwnershipTypeClass.DATAOWNER,
) -> models.OwnershipClass:
return models.OwnershipClass(
owners=[models.OwnerClass(owner=owner, type=ownership_type) for owner in owners]
)
def test_ownership_patching_intersect(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["foo", "bar"])
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "bar" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
def test_ownership_patching_with_nones(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
server_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", None
)
assert not test_ownership
def test_ownership_patching_with_empty_mce_none_server(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_empty_mce_nonempty_server(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"])
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_different_types_1(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.PRODUCER) in [
(o.owner, o.type) for o in test_ownership.owners
]
def test_ownership_patching_with_different_types_2(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert len(test_ownership.owners) == 2
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]

View File

@ -324,6 +324,60 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla
return mce
```
### More Sophistication: Making calls to DataHub during Transformation
In some advanced cases, you might want to check with DataHub before performing a transformation. A good example for this might be retrieving the current set of owners of a dataset before providing the new set of owners during an ingestion process. To allow transformers to always be able to query the graph, the framework provides them access to the graph through the context object `ctx`. Connectivity to the graph is automatically instantiated anytime the pipeline uses a REST sink. In case you are using the Kafka sink, you can additionally provide access to the graph by configuring it in your pipeline.
Here is an example of a recipe that uses Kafka as the sink, but provides access to the graph by explicitly configuring the `datahub_api`.
```yaml
source:
type: mysql
config:
# ..source configs
sink:
type: datahub-kafka
config:
connection:
bootstrap: localhost:9092
schema_registry_url: "http://localhost:8081"
datahub_api:
server: http://localhost:8080
# standard configs accepted by datahub rest client ...
```
#### Advanced Use-Case: Patching Owners
With the above capability, we can now build more powerful transformers that can check with the server-side state before issuing changes in metadata.
e.g. Here is how the AddDatasetOwnership transformer can now support PATCH semantics by ensuring that it never deletes any owners that are stored on the server.
```python
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce
owners_to_add = self.config.get_owners_to_add(mce.proposedSnapshot)
if owners_to_add:
ownership = builder.get_or_add_aspect(
mce,
OwnershipClass(
owners=[],
),
)
ownership.owners.extend(owners_to_add)
if self.config.semantics == Semantics.PATCH:
assert self.ctx.graph
patch_ownership = AddDatasetOwnership.get_ownership_to_set(
self.ctx.graph, mce.proposedSnapshot.urn, ownership
)
builder.set_aspect(
mce, aspect=patch_ownership, aspect_type=OwnershipClass
)
return mce
```
### Installing the package
Now that we've defined the transformer, we need to make it visible to DataHub. The easiest way to do this is to just place it in the same directory as your recipe, in which case the module name is the same as the file in this case, `custom_transform_example`.