datahub/metadata-ingestion/examples/transforms/custom_transform_example.py

69 lines
2.1 KiB
Python
Raw Permalink Normal View History

# see https://docs.datahub.com/docs/metadata-ingestion/transformers for original tutorial
import json
from typing import List, Optional
from datahub.configuration.common import ConfigModel, TransformerSemantics
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
class AddCustomOwnershipConfig(ConfigModel):
owners_json: str
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
class AddCustomOwnership(BaseTransformer, SingleAspectTransformer):
"""Transformer that adds owners to datasets according to a callback function."""
# context param to generate run metadata such as a run ID
ctx: PipelineContext
# as defined in the previous block
config: AddCustomOwnershipConfig
def __init__(self, config: AddCustomOwnershipConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
with open(self.config.owners_json, "r") as f:
raw_owner_urns = json.load(f)
self.owners = [
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER)
for owner in raw_owner_urns
]
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddCustomOwnership":
config = AddCustomOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def aspect_name(self) -> str:
return "ownership"
def transform_aspect( # type: ignore
self, entity_urn: str, aspect_name: str, aspect: Optional[OwnershipClass]
) -> Optional[OwnershipClass]:
owners_to_add = self.owners
assert aspect is None or isinstance(aspect, OwnershipClass)
if owners_to_add:
ownership = aspect or OwnershipClass(
owners=[],
)
ownership.owners.extend(owners_to_add)
return ownership