feat(ingest): Add GenericAspectTransformer (#7994)

Co-authored-by: Adrián Pertíñez <khurzak92@gmail.com>
This commit is contained in:
Amanda Hernando 2023-05-24 22:31:33 +02:00 committed by GitHub
parent c075c5e2df
commit 0e0d8934ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 417 additions and 4 deletions

View File

@ -64,7 +64,10 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
def _should_process(
self,
record: Union[
MetadataChangeEventClass, MetadataChangeProposalWrapper, ControlRecord
MetadataChangeEventClass,
MetadataChangeProposalWrapper,
MetadataChangeProposalClass,
ControlRecord,
],
) -> bool:
if isinstance(record, ControlRecord):
@ -92,7 +95,9 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
record_entry["seen"]["mce"] = mce.systemMetadata
self.entity_map[mce.proposedSnapshot.urn] = record_entry
def _record_mcp(self, mcp: MetadataChangeProposalWrapper) -> None:
def _record_mcp(
self, mcp: Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
) -> None:
assert mcp.entityUrn
record_entry = self.entity_map.get(mcp.entityUrn, {"seen": {}})
if "seen" in record_entry and "mcp" not in record_entry["seen"]:
@ -150,7 +155,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
return envelope
def _transform_or_record_mcp(
def _transform_or_record_mcpw(
self,
envelope: RecordEnvelope[MetadataChangeProposalWrapper],
) -> Optional[RecordEnvelope[MetadataChangeProposalWrapper]]:
@ -187,7 +192,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
elif isinstance(
envelope.record, MetadataChangeProposalWrapper
) and isinstance(self, SingleAspectTransformer):
return_envelope = self._transform_or_record_mcp(envelope)
return_envelope = self._transform_or_record_mcpw(envelope)
if return_envelope is None:
continue
else:

View File

@ -0,0 +1,124 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Iterable, Optional
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import EndOfStream, RecordEnvelope
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
)
from datahub.metadata.schema_classes import (
GenericAspectClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
)
from datahub.utilities.urns.urn import Urn
log = logging.getLogger(__name__)
class GenericAspectTransformer(
BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta
):
"""Transformer that does transform custom aspects using GenericAspectClass."""
def __init__(self):
super().__init__()
def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
"""Do not implement."""
pass
@abstractmethod
def transform_generic_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass]
) -> Optional[GenericAspectClass]:
"""Implement this method to transform the single custom aspect for an entity.
The purpose of this abstract method is to reinforce the use of GenericAspectClass."""
pass
def _transform_or_record_mcpc(
self,
envelope: RecordEnvelope[MetadataChangeProposalClass],
) -> Optional[RecordEnvelope[MetadataChangeProposalClass]]:
assert envelope.record.entityUrn
assert isinstance(self, SingleAspectTransformer)
if envelope.record.aspectName == self.aspect_name() and envelope.record.aspect:
transformed_aspect = self.transform_generic_aspect(
entity_urn=envelope.record.entityUrn,
aspect_name=envelope.record.aspectName,
aspect=envelope.record.aspect,
)
self._mark_processed(envelope.record.entityUrn)
if transformed_aspect is None:
log.debug(
f"Dropping record {envelope} as transformation result is None"
)
envelope.record.aspect = transformed_aspect
else:
self._record_mcp(envelope.record)
return envelope if envelope.record.aspect is not None else None
def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
"""
This method overrides the original one from BaseTransformer in order to support
custom aspects. They need to be upserted with MetadataChangeProposalClass instead of
MetadataChangeProposalWrapper used at the original method.
"""
for envelope in record_envelopes:
if not self._should_process(envelope.record):
pass
elif isinstance(envelope.record, MetadataChangeEventClass):
self._record_mce(envelope.record)
elif isinstance(envelope.record, MetadataChangeProposalWrapper):
self._record_mcp(envelope.record)
elif isinstance(envelope.record, MetadataChangeProposalClass):
return_envelope = self._transform_or_record_mcpc(envelope)
if return_envelope is None:
continue
else:
envelope = return_envelope
elif isinstance(envelope.record, EndOfStream) and isinstance(
self, SingleAspectTransformer
):
for urn, state in self.entity_map.items():
if "seen" in state:
last_seen_mcp = state["seen"].get("mcp")
last_seen_mce_system_metadata = state["seen"].get("mce")
transformed_aspect = self.transform_generic_aspect(
entity_urn=urn,
aspect_name=self.aspect_name(),
aspect=None,
)
if transformed_aspect:
# for end of stream records, we modify the workunit-id
structured_urn = Urn.create_from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
record_metadata = envelope.metadata.copy()
record_metadata.update(
{
"workunit_id": f"txform-{simple_name}-{self.aspect_name()}"
}
)
yield RecordEnvelope(
record=MetadataChangeProposalClass(
entityType=structured_urn.get_type(),
entityUrn=urn,
changeType="UPSERT",
aspectName=self.aspect_name(),
aspect=transformed_aspect,
systemMetadata=last_seen_mcp.systemMetadata
if last_seen_mcp
else last_seen_mce_system_metadata,
),
metadata=record_metadata,
)
self._mark_processed(urn)
yield envelope

View File

@ -0,0 +1,284 @@
import json
import unittest
from typing import Any, List, Optional
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.generic_aspect_transformer import (
GenericAspectTransformer,
)
from datahub.metadata.schema_classes import (
DataJobSnapshotClass,
DatasetSnapshotClass,
GenericAspectClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
)
from datahub.utilities.urns.urn import Urn
def make_mce_dataset(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects: Optional[List[Any]] = None,
) -> MetadataChangeEventClass:
if aspects is None:
aspects = [StatusClass(removed=False)]
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=entity_urn,
aspects=aspects,
),
)
def make_mce_datajob(
entity_urn: str = "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects: Optional[List[Any]] = None,
) -> MetadataChangeEventClass:
if aspects is None:
aspects = [StatusClass(removed=False)]
return MetadataChangeEventClass(
proposedSnapshot=DataJobSnapshotClass(urn=entity_urn, aspects=aspects)
)
def make_mcpw(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspect_name: str = "status",
aspect: Any = StatusClass(removed=False),
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.create_from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)
def make_mcpc(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspect_name: str = "status",
aspect: Any = StatusClass(removed=False),
) -> MetadataChangeProposalClass:
return MetadataChangeProposalClass(
entityUrn=entity_urn,
entityType=Urn.create_from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)
class DummyGenericAspectTransformer(GenericAspectTransformer):
def __init__(self):
super().__init__()
@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "DummyGenericAspectTransformer":
return cls()
def entity_types(self) -> List[str]:
return ["dataset"]
def aspect_name(self) -> str:
return "customAspect"
def transform_generic_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass]
) -> Optional[GenericAspectClass]:
value = (
aspect.value if aspect else json.dumps({"customAspect": 10}).encode("utf-8")
)
result_aspect = GenericAspectClass(
contentType="application/json",
value=value,
)
return result_aspect
class TestDummyGenericAspectTransformer(unittest.TestCase):
def test_add_generic_aspect_when_mce_received(self):
mce_dataset = make_mce_dataset()
mce_datajob = make_mce_datajob()
inputs = [mce_dataset, mce_datajob, EndOfStream()]
outputs = list(
DummyGenericAspectTransformer().transform(
[RecordEnvelope(i, metadata={}) for i in inputs]
)
)
assert len(outputs) == len(inputs) + 1
# Verify that the first entry is unchanged.
assert inputs[0] == outputs[0].record
# Verify that the second entry is unchanged.
assert inputs[1] == outputs[1].record
# Check the first entry generates generic aspect.
last_event = outputs[2].record
assert last_event.entityUrn == mce_dataset.proposedSnapshot.urn
assert isinstance(last_event, MetadataChangeProposalClass)
assert isinstance(last_event.aspect, GenericAspectClass)
assert (json.loads(last_event.aspect.value))["customAspect"] == 10
# Verify that the last entry is EndOfStream
assert inputs[2] == outputs[3].record
def test_add_generic_aspect_when_mcpw_received(self):
mcpw_dataset = make_mcpw()
mcpw_datajob = make_mcpw(
entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"
)
inputs = [mcpw_dataset, mcpw_datajob, EndOfStream()]
outputs = list(
DummyGenericAspectTransformer().transform(
[RecordEnvelope(i, metadata={}) for i in inputs]
)
)
assert len(outputs) == len(inputs) + 1
# Verify that the first entry is unchanged.
assert inputs[0] == outputs[0].record
# Verify that the second entry is unchanged.
assert inputs[1] == outputs[1].record
# Check the first entry generates generic aspect.
last_event = outputs[2].record
assert last_event.entityUrn == mcpw_dataset.entityUrn
assert isinstance(last_event, MetadataChangeProposalClass)
assert isinstance(last_event.aspect, GenericAspectClass)
assert (json.loads(last_event.aspect.value))["customAspect"] == 10
# Verify that the last entry is EndOfStream
assert inputs[2] == outputs[3].record
def test_add_generic_aspect_when_mcpc_received(self):
mcpc_dataset = make_mcpc()
mcpc_datajob = make_mcpc(
entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"
)
inputs = [mcpc_dataset, mcpc_datajob, EndOfStream()]
outputs = list(
DummyGenericAspectTransformer().transform(
[RecordEnvelope(i, metadata={}) for i in inputs]
)
)
assert len(outputs) == len(inputs) + 1
# Verify that the first entry is unchanged.
assert inputs[0] == outputs[0].record
# Verify that the second entry is unchanged.
assert inputs[1] == outputs[1].record
# Check the first entry generates generic aspect.
last_event = outputs[2].record
assert last_event.entityUrn == mcpc_dataset.entityUrn
assert isinstance(last_event, MetadataChangeProposalClass)
assert isinstance(last_event.aspect, GenericAspectClass)
assert (json.loads(last_event.aspect.value))["customAspect"] == 10
# Verify that the last entry is EndOfStream
assert inputs[2] == outputs[3].record
def test_modify_generic_aspect_when_mcpc_received(self):
mcpc_dataset_without_custom_aspect = make_mcpc()
mcpc_dataset_with_custom_aspect = make_mcpc(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)",
aspect_name="customAspect",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps({"customAspect": 5}).encode("utf-8"),
),
)
mcpc_datajob = make_mcpc(
entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"
)
inputs = [
mcpc_dataset_without_custom_aspect,
mcpc_dataset_with_custom_aspect,
mcpc_datajob,
EndOfStream(),
]
outputs = list(
DummyGenericAspectTransformer().transform(
[RecordEnvelope(i, metadata={}) for i in inputs]
)
)
assert len(outputs) == len(inputs) + 1
# Verify that the first entry is unchanged.
assert inputs[0] == outputs[0].record
# Check the second entry has original generic aspect.
assert outputs[1].record.entityUrn == mcpc_dataset_with_custom_aspect.entityUrn
assert isinstance(outputs[1].record, MetadataChangeProposalClass)
assert isinstance(outputs[1].record.aspect, GenericAspectClass)
assert (json.loads(outputs[1].record.aspect.value))["customAspect"] == 5
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
# Check the first entry generates generic aspect.
last_event = outputs[3].record
assert last_event.entityUrn == mcpc_dataset_without_custom_aspect.entityUrn
assert isinstance(last_event, MetadataChangeProposalClass)
assert isinstance(last_event.aspect, GenericAspectClass)
assert (json.loads(last_event.aspect.value))["customAspect"] == 10
# Verify that the last entry is EndOfStream
assert inputs[3] == outputs[4].record
class DummyRemoveGenericAspectTransformer(GenericAspectTransformer):
def __init__(self):
super().__init__()
@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "DummyRemoveGenericAspectTransformer":
return cls()
def entity_types(self) -> List[str]:
return ["dataset"]
def aspect_name(self) -> str:
return "customAspect"
def transform_generic_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass]
) -> Optional[GenericAspectClass]:
return None
class TestDummyRemoveGenericAspectTransformer(unittest.TestCase):
def test_remove_generic_aspect_when_mcpc_received(self):
mcpc_dataset_without_custom_aspect = make_mcpc()
mcpc_dataset_with_custom_aspect = make_mcpc(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)",
aspect_name="customAspect",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps({"customAspect": 5}).encode("utf-8"),
),
)
mcpc_datajob = make_mcpc(
entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"
)
inputs = [
mcpc_dataset_without_custom_aspect,
mcpc_dataset_with_custom_aspect,
mcpc_datajob,
EndOfStream(),
]
outputs = list(
DummyRemoveGenericAspectTransformer().transform(
[RecordEnvelope(i, metadata={}) for i in inputs]
)
)
# Check that the second entry is removed.
assert len(outputs) == len(inputs) - 1
# Verify that the first entry is unchanged.
assert inputs[0] == outputs[0].record
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[1].record
# Verify that the last entry is EndOfStream
assert inputs[3] == outputs[2].record
if __name__ == "__main__":
unittest.main()