diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 9b09e30c70..b618d75810 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -586,8 +586,27 @@ transformers: some_property: "some.value" ``` -A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py) -At the moment there are no built-in transformers. +A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py). + +### `simple_add_dataset_ownership` + +Adds a set of owners to every dataset. + +```yml +transformers: + - type: "simple_add_dataset_ownership" + config: + owner_urns: + - "urn:li:corpuser:username1" + - "urn:li:corpuser:username2" + - "urn:li:corpGroup:groupname" +``` + +:::tip + +If you'd like to add more complex logic for assigning ownership, you can use the more generic [`AddDatasetOwnership` transformer](./src/datahub/ingestion/transformer/add_dataset_ownership.py), which calls a user-provided function to determine the ownership of each dataset. + +::: ## Using as a library diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 021c17abec..abfa9ce1ff 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -1,7 +1,7 @@ """Convenience functions for creating MCEs""" import time -from typing import List, Union +from typing import List, Optional, Type, TypeVar from datahub.metadata.schema_classes import ( AuditStampClass, @@ -15,6 +15,12 @@ from datahub.metadata.schema_classes import ( DEFAULT_ENV = "PROD" DEFAULT_FLOW_CLUSTER = "prod" +T = TypeVar("T") + + +def get_sys_time() -> int: + return int(time.time() * 1000) + def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})" @@ -47,14 +53,12 @@ def make_data_job_urn( def make_lineage_mce( - upstream_urns: Union[str, List[str]], + upstream_urns: List[str], downstream_urn: str, actor: str = make_user_urn("datahub"), lineage_type: str = DatasetLineageTypeClass.TRANSFORMED, ) -> MetadataChangeEventClass: - sys_time = int(time.time() * 1000) - if not isinstance(upstream_urns, list): - upstream_urns = [upstream_urns] + sys_time = get_sys_time() mce = MetadataChangeEventClass( proposedSnapshot=DatasetSnapshotClass( @@ -77,3 +81,24 @@ def make_lineage_mce( ) ) return mce + + +def get_aspect_if_available( + mce: MetadataChangeEventClass, type: Type[T] +) -> Optional[T]: + all_aspects = mce.proposedSnapshot.aspects + aspects: List[T] = [aspect for aspect in all_aspects if isinstance(aspect, type)] + + if len(aspects) > 1: + raise ValueError(f"MCE contains multiple aspects of type {type}: {aspects}") + if aspects: + return aspects[0] + return None + + +def get_or_add_aspect(mce: MetadataChangeEventClass, default: T) -> T: + existing = get_aspect_if_available(mce, type(default)) + if existing is not None: + return existing + mce.proposedSnapshot.aspects.append(default) + return default diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py new file mode 100644 index 0000000000..82b8fc28a6 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -0,0 +1,96 @@ +from typing import Callable, Iterable, List, Union + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel +from datahub.ingestion.api.common import PipelineContext, RecordEnvelope +from datahub.ingestion.api.transform import Transformer +from datahub.metadata.schema_classes import ( + AuditStampClass, + DatasetSnapshotClass, + MetadataChangeEventClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, +) + + +class AddDatasetOwnershipConfig(ConfigModel): + # Workaround for https://github.com/python/mypy/issues/708. + # Suggested by https://stackoverflow.com/a/64528725/5004662. + get_owners_to_add: Union[ + Callable[[DatasetSnapshotClass], List[OwnerClass]], + Callable[[DatasetSnapshotClass], List[OwnerClass]], + ] + default_actor: str = builder.make_user_urn("etl") + + +class AddDatasetOwnership(Transformer): + """Transformer that adds owners to datasets according to a callback function.""" + + ctx: PipelineContext + config: AddDatasetOwnershipConfig + + def __init__(self, config: AddDatasetOwnershipConfig, ctx: PipelineContext): + self.ctx = ctx + self.config = config + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership": + config = AddDatasetOwnershipConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform( + self, record_envelopes: Iterable[RecordEnvelope] + ) -> Iterable[RecordEnvelope]: + for envelope in record_envelopes: + if isinstance(envelope.record, MetadataChangeEventClass): + envelope.record = self.transform_one(envelope.record) + yield envelope + + def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: + if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): + return mce + + owners_to_add = self.config.get_owners_to_add(mce.proposedSnapshot) + if owners_to_add: + ownership = builder.get_or_add_aspect( + mce, + OwnershipClass( + owners=[], + lastModified=AuditStampClass( + time=builder.get_sys_time(), + actor=self.config.default_actor, + ), + ), + ) + ownership.owners.extend(owners_to_add) + + return mce + + +class SimpleDatasetOwnershipConfig(ConfigModel): + owner_urns: List[str] + default_actor: str = builder.make_user_urn("etl") + + +class SimpleAddDatasetOwnership(AddDatasetOwnership): + """Transformer that adds a specified set of owners to each dataset.""" + + def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext): + owners = [ + OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER) + for owner in config.owner_urns + ] + + generic_config = AddDatasetOwnershipConfig( + get_owners_to_add=lambda _: owners, + default_actor=config.default_actor, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetOwnership": + config = SimpleDatasetOwnershipConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py index daf99088c5..10b2c05e83 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py @@ -1,4 +1,9 @@ from datahub.ingestion.api.registry import Registry from datahub.ingestion.api.transform import Transformer +from .add_dataset_ownership import AddDatasetOwnership, SimpleAddDatasetOwnership + transform_registry = Registry[Transformer]() + +transform_registry.register("add_dataset_ownership", AddDatasetOwnership) +transform_registry.register("simple_add_dataset_ownership", SimpleAddDatasetOwnership) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset_ownership.py b/metadata-ingestion/tests/unit/test_transform_dataset_ownership.py new file mode 100644 index 0000000000..51277944ba --- /dev/null +++ b/metadata-ingestion/tests/unit/test_transform_dataset_ownership.py @@ -0,0 +1,87 @@ +import datahub.emitter.mce_builder as builder +import datahub.metadata.schema_classes as models +from datahub.ingestion.api.common import PipelineContext, RecordEnvelope +from datahub.ingestion.transformer.add_dataset_ownership import ( + SimpleAddDatasetOwnership, +) + + +def test_simple_dataset_ownership_tranformation(mock_time): + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[ + models.StatusClass(removed=False), + ], + ), + ) + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=builder.get_sys_time(), actor="urn:li:corpuser:datahub" + ), + ) + ], + ), + ) + + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + not_a_dataset, + ] + + transformer = SimpleAddDatasetOwnership.create( + { + "owner_urns": [ + builder.make_user_urn("person1"), + builder.make_user_urn("person2"), + ] + }, + PipelineContext(run_id="test"), + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + + # Check the first entry. + first_ownership_aspect = builder.get_aspect_if_available( + outputs[0].record, models.OwnershipClass + ) + assert first_ownership_aspect + assert len(first_ownership_aspect.owners) == 2 + + # Check the second entry. + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 3 + + # Verify that the third entry is unchanged. + assert inputs[2] == outputs[2].record