mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-13 18:04:55 +00:00
feat(ingest): add a transformer for adding ownership (#2532)
This commit is contained in:
parent
95782b1acf
commit
2811d23e45
@ -586,8 +586,27 @@ transformers:
|
|||||||
some_property: "some.value"
|
some_property: "some.value"
|
||||||
```
|
```
|
||||||
|
|
||||||
A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py)
|
A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py).
|
||||||
At the moment there are no built-in transformers.
|
|
||||||
|
### `simple_add_dataset_ownership`
|
||||||
|
|
||||||
|
Adds a set of owners to every dataset.
|
||||||
|
|
||||||
|
```yml
|
||||||
|
transformers:
|
||||||
|
- type: "simple_add_dataset_ownership"
|
||||||
|
config:
|
||||||
|
owner_urns:
|
||||||
|
- "urn:li:corpuser:username1"
|
||||||
|
- "urn:li:corpuser:username2"
|
||||||
|
- "urn:li:corpGroup:groupname"
|
||||||
|
```
|
||||||
|
|
||||||
|
:::tip
|
||||||
|
|
||||||
|
If you'd like to add more complex logic for assigning ownership, you can use the more generic [`AddDatasetOwnership` transformer](./src/datahub/ingestion/transformer/add_dataset_ownership.py), which calls a user-provided function to determine the ownership of each dataset.
|
||||||
|
|
||||||
|
:::
|
||||||
|
|
||||||
## Using as a library
|
## Using as a library
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
"""Convenience functions for creating MCEs"""
|
"""Convenience functions for creating MCEs"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from typing import List, Union
|
from typing import List, Optional, Type, TypeVar
|
||||||
|
|
||||||
from datahub.metadata.schema_classes import (
|
from datahub.metadata.schema_classes import (
|
||||||
AuditStampClass,
|
AuditStampClass,
|
||||||
@ -15,6 +15,12 @@ from datahub.metadata.schema_classes import (
|
|||||||
DEFAULT_ENV = "PROD"
|
DEFAULT_ENV = "PROD"
|
||||||
DEFAULT_FLOW_CLUSTER = "prod"
|
DEFAULT_FLOW_CLUSTER = "prod"
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
def get_sys_time() -> int:
|
||||||
|
return int(time.time() * 1000)
|
||||||
|
|
||||||
|
|
||||||
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
|
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
|
||||||
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"
|
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"
|
||||||
@ -47,14 +53,12 @@ def make_data_job_urn(
|
|||||||
|
|
||||||
|
|
||||||
def make_lineage_mce(
|
def make_lineage_mce(
|
||||||
upstream_urns: Union[str, List[str]],
|
upstream_urns: List[str],
|
||||||
downstream_urn: str,
|
downstream_urn: str,
|
||||||
actor: str = make_user_urn("datahub"),
|
actor: str = make_user_urn("datahub"),
|
||||||
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED,
|
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED,
|
||||||
) -> MetadataChangeEventClass:
|
) -> MetadataChangeEventClass:
|
||||||
sys_time = int(time.time() * 1000)
|
sys_time = get_sys_time()
|
||||||
if not isinstance(upstream_urns, list):
|
|
||||||
upstream_urns = [upstream_urns]
|
|
||||||
|
|
||||||
mce = MetadataChangeEventClass(
|
mce = MetadataChangeEventClass(
|
||||||
proposedSnapshot=DatasetSnapshotClass(
|
proposedSnapshot=DatasetSnapshotClass(
|
||||||
@ -77,3 +81,24 @@ def make_lineage_mce(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
return mce
|
return mce
|
||||||
|
|
||||||
|
|
||||||
|
def get_aspect_if_available(
|
||||||
|
mce: MetadataChangeEventClass, type: Type[T]
|
||||||
|
) -> Optional[T]:
|
||||||
|
all_aspects = mce.proposedSnapshot.aspects
|
||||||
|
aspects: List[T] = [aspect for aspect in all_aspects if isinstance(aspect, type)]
|
||||||
|
|
||||||
|
if len(aspects) > 1:
|
||||||
|
raise ValueError(f"MCE contains multiple aspects of type {type}: {aspects}")
|
||||||
|
if aspects:
|
||||||
|
return aspects[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_or_add_aspect(mce: MetadataChangeEventClass, default: T) -> T:
|
||||||
|
existing = get_aspect_if_available(mce, type(default))
|
||||||
|
if existing is not None:
|
||||||
|
return existing
|
||||||
|
mce.proposedSnapshot.aspects.append(default)
|
||||||
|
return default
|
||||||
|
@ -0,0 +1,96 @@
|
|||||||
|
from typing import Callable, Iterable, List, Union
|
||||||
|
|
||||||
|
import datahub.emitter.mce_builder as builder
|
||||||
|
from datahub.configuration.common import ConfigModel
|
||||||
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
||||||
|
from datahub.ingestion.api.transform import Transformer
|
||||||
|
from datahub.metadata.schema_classes import (
|
||||||
|
AuditStampClass,
|
||||||
|
DatasetSnapshotClass,
|
||||||
|
MetadataChangeEventClass,
|
||||||
|
OwnerClass,
|
||||||
|
OwnershipClass,
|
||||||
|
OwnershipTypeClass,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AddDatasetOwnershipConfig(ConfigModel):
|
||||||
|
# Workaround for https://github.com/python/mypy/issues/708.
|
||||||
|
# Suggested by https://stackoverflow.com/a/64528725/5004662.
|
||||||
|
get_owners_to_add: Union[
|
||||||
|
Callable[[DatasetSnapshotClass], List[OwnerClass]],
|
||||||
|
Callable[[DatasetSnapshotClass], List[OwnerClass]],
|
||||||
|
]
|
||||||
|
default_actor: str = builder.make_user_urn("etl")
|
||||||
|
|
||||||
|
|
||||||
|
class AddDatasetOwnership(Transformer):
|
||||||
|
"""Transformer that adds owners to datasets according to a callback function."""
|
||||||
|
|
||||||
|
ctx: PipelineContext
|
||||||
|
config: AddDatasetOwnershipConfig
|
||||||
|
|
||||||
|
def __init__(self, config: AddDatasetOwnershipConfig, ctx: PipelineContext):
|
||||||
|
self.ctx = ctx
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership":
|
||||||
|
config = AddDatasetOwnershipConfig.parse_obj(config_dict)
|
||||||
|
return cls(config, ctx)
|
||||||
|
|
||||||
|
def transform(
|
||||||
|
self, record_envelopes: Iterable[RecordEnvelope]
|
||||||
|
) -> Iterable[RecordEnvelope]:
|
||||||
|
for envelope in record_envelopes:
|
||||||
|
if isinstance(envelope.record, MetadataChangeEventClass):
|
||||||
|
envelope.record = self.transform_one(envelope.record)
|
||||||
|
yield envelope
|
||||||
|
|
||||||
|
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
|
||||||
|
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
|
||||||
|
return mce
|
||||||
|
|
||||||
|
owners_to_add = self.config.get_owners_to_add(mce.proposedSnapshot)
|
||||||
|
if owners_to_add:
|
||||||
|
ownership = builder.get_or_add_aspect(
|
||||||
|
mce,
|
||||||
|
OwnershipClass(
|
||||||
|
owners=[],
|
||||||
|
lastModified=AuditStampClass(
|
||||||
|
time=builder.get_sys_time(),
|
||||||
|
actor=self.config.default_actor,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
ownership.owners.extend(owners_to_add)
|
||||||
|
|
||||||
|
return mce
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleDatasetOwnershipConfig(ConfigModel):
|
||||||
|
owner_urns: List[str]
|
||||||
|
default_actor: str = builder.make_user_urn("etl")
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleAddDatasetOwnership(AddDatasetOwnership):
|
||||||
|
"""Transformer that adds a specified set of owners to each dataset."""
|
||||||
|
|
||||||
|
def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext):
|
||||||
|
owners = [
|
||||||
|
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER)
|
||||||
|
for owner in config.owner_urns
|
||||||
|
]
|
||||||
|
|
||||||
|
generic_config = AddDatasetOwnershipConfig(
|
||||||
|
get_owners_to_add=lambda _: owners,
|
||||||
|
default_actor=config.default_actor,
|
||||||
|
)
|
||||||
|
super().__init__(generic_config, ctx)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(
|
||||||
|
cls, config_dict: dict, ctx: PipelineContext
|
||||||
|
) -> "SimpleAddDatasetOwnership":
|
||||||
|
config = SimpleDatasetOwnershipConfig.parse_obj(config_dict)
|
||||||
|
return cls(config, ctx)
|
@ -1,4 +1,9 @@
|
|||||||
from datahub.ingestion.api.registry import Registry
|
from datahub.ingestion.api.registry import Registry
|
||||||
from datahub.ingestion.api.transform import Transformer
|
from datahub.ingestion.api.transform import Transformer
|
||||||
|
|
||||||
|
from .add_dataset_ownership import AddDatasetOwnership, SimpleAddDatasetOwnership
|
||||||
|
|
||||||
transform_registry = Registry[Transformer]()
|
transform_registry = Registry[Transformer]()
|
||||||
|
|
||||||
|
transform_registry.register("add_dataset_ownership", AddDatasetOwnership)
|
||||||
|
transform_registry.register("simple_add_dataset_ownership", SimpleAddDatasetOwnership)
|
||||||
|
@ -0,0 +1,87 @@
|
|||||||
|
import datahub.emitter.mce_builder as builder
|
||||||
|
import datahub.metadata.schema_classes as models
|
||||||
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
||||||
|
from datahub.ingestion.transformer.add_dataset_ownership import (
|
||||||
|
SimpleAddDatasetOwnership,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_dataset_ownership_tranformation(mock_time):
|
||||||
|
no_owner_aspect = models.MetadataChangeEventClass(
|
||||||
|
proposedSnapshot=models.DatasetSnapshotClass(
|
||||||
|
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
|
||||||
|
aspects=[
|
||||||
|
models.StatusClass(removed=False),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
with_owner_aspect = models.MetadataChangeEventClass(
|
||||||
|
proposedSnapshot=models.DatasetSnapshotClass(
|
||||||
|
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
|
||||||
|
aspects=[
|
||||||
|
models.OwnershipClass(
|
||||||
|
owners=[
|
||||||
|
models.OwnerClass(
|
||||||
|
owner=builder.make_user_urn("fake_owner"),
|
||||||
|
type=models.OwnershipTypeClass.DATAOWNER,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
lastModified=models.AuditStampClass(
|
||||||
|
time=builder.get_sys_time(), actor="urn:li:corpuser:datahub"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
not_a_dataset = models.MetadataChangeEventClass(
|
||||||
|
proposedSnapshot=models.DataJobSnapshotClass(
|
||||||
|
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
|
||||||
|
aspects=[
|
||||||
|
models.DataJobInfoClass(
|
||||||
|
name="User Deletions",
|
||||||
|
description="Constructs the fct_users_deleted from logging_events",
|
||||||
|
type=models.AzkabanJobTypeClass.SQL,
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
inputs = [
|
||||||
|
no_owner_aspect,
|
||||||
|
with_owner_aspect,
|
||||||
|
not_a_dataset,
|
||||||
|
]
|
||||||
|
|
||||||
|
transformer = SimpleAddDatasetOwnership.create(
|
||||||
|
{
|
||||||
|
"owner_urns": [
|
||||||
|
builder.make_user_urn("person1"),
|
||||||
|
builder.make_user_urn("person2"),
|
||||||
|
]
|
||||||
|
},
|
||||||
|
PipelineContext(run_id="test"),
|
||||||
|
)
|
||||||
|
|
||||||
|
outputs = list(
|
||||||
|
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(outputs) == len(inputs)
|
||||||
|
|
||||||
|
# Check the first entry.
|
||||||
|
first_ownership_aspect = builder.get_aspect_if_available(
|
||||||
|
outputs[0].record, models.OwnershipClass
|
||||||
|
)
|
||||||
|
assert first_ownership_aspect
|
||||||
|
assert len(first_ownership_aspect.owners) == 2
|
||||||
|
|
||||||
|
# Check the second entry.
|
||||||
|
second_ownership_aspect = builder.get_aspect_if_available(
|
||||||
|
outputs[1].record, models.OwnershipClass
|
||||||
|
)
|
||||||
|
assert second_ownership_aspect
|
||||||
|
assert len(second_ownership_aspect.owners) == 3
|
||||||
|
|
||||||
|
# Verify that the third entry is unchanged.
|
||||||
|
assert inputs[2] == outputs[2].record
|
Loading…
x
Reference in New Issue
Block a user