feat(ingest/transform): extend ownership transformer to other entities (#11700)

This commit is contained in:
Aseem Bansal 2024-10-29 15:28:41 +05:30 committed by GitHub
parent bea253a064
commit 02f0a3dee7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 64 additions and 32 deletions

View File

@ -13,9 +13,7 @@ from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
MetadataChangeProposalClass,
@ -37,7 +35,7 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
is_container: bool = False
class AddDatasetOwnership(DatasetOwnershipTransformer):
class AddDatasetOwnership(OwnershipTransformer):
"""Transformer that adds owners to datasets according to a callback function."""
ctx: PipelineContext

View File

@ -27,6 +27,22 @@ class DatasetTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABC
return ["dataset"]
class OwnershipTransformer(
DatasetTransformer, SingleAspectTransformer, metaclass=ABCMeta
):
def aspect_name(self) -> str:
return "ownership"
def entity_types(self) -> List[str]:
return [
"dataset",
"dataJob",
"dataFlow",
"chart",
"dashboard",
]
class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""
@ -47,11 +63,6 @@ class ContainerTransformer(BaseTransformer, SingleAspectTransformer, metaclass=A
return ["container"]
class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
class DatasetDomainTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "domains"

View File

@ -4,9 +4,7 @@ from typing import List, Optional, Set, cast
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
@ -20,7 +18,7 @@ class PatternCleanUpOwnershipConfig(ConfigModel):
pattern_for_cleanup: List[str]
class PatternCleanUpOwnership(DatasetOwnershipTransformer):
class PatternCleanUpOwnership(OwnershipTransformer):
"""Transformer that clean the ownership URN."""
ctx: PipelineContext

View File

@ -3,9 +3,7 @@ from typing import Optional, cast
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import OwnershipClass
@ -13,7 +11,7 @@ class ClearDatasetOwnershipConfig(ConfigModel):
pass
class SimpleRemoveDatasetOwnership(DatasetOwnershipTransformer):
class SimpleRemoveDatasetOwnership(OwnershipTransformer):
"""Transformer that clears all owners on each dataset."""
def __init__(self, config: ClearDatasetOwnershipConfig, ctx: PipelineContext):

View File

@ -220,7 +220,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass:
)
def test_simple_dataset_ownership_transformation(mock_time):
def test_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()
with_owner_aspect = make_dataset_with_owner()
@ -254,7 +254,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 1
assert len(outputs) == len(inputs) + 2
# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
@ -287,11 +287,21 @@ def test_simple_dataset_ownership_transformation(mock_time):
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
# Verify that the last entry is EndOfStream
assert inputs[3] == outputs[4].record
assert inputs[-1] == outputs[-1].record
def test_simple_dataset_ownership_with_type_transformation(mock_time):
@ -1003,6 +1013,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
@ -1014,7 +1025,9 @@ def test_pattern_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 1 # additional MCP due to the no-owner MCE
assert (
len(outputs) == len(inputs) + 2
) # additional MCP due to the no-owner MCE + datajob
# Check the first entry.
assert inputs[0] == outputs[0].record
@ -1042,6 +1055,16 @@ def test_pattern_dataset_ownership_transformation(mock_time):
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in third_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
@ -1122,14 +1145,14 @@ def test_pattern_container_and_dataset_ownership_transformation(
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
# No owner aspect for the first dataset
no_owner_aspect = models.MetadataChangeEventClass(
no_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[models.StatusClass(removed=False)],
),
)
# Dataset with an existing owner
with_owner_aspect = models.MetadataChangeEventClass(
with_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
@ -1148,8 +1171,7 @@ def test_pattern_container_and_dataset_ownership_transformation(
),
)
# Not a dataset, should be ignored
not_a_dataset = models.MetadataChangeEventClass(
datajob = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
@ -1163,9 +1185,9 @@ def test_pattern_container_and_dataset_ownership_transformation(
)
inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
no_owner_aspect_dataset,
with_owner_aspect_dataset,
datajob,
EndOfStream(),
]
@ -1176,6 +1198,7 @@ def test_pattern_container_and_dataset_ownership_transformation(
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person3")],
}
},
"ownership_type": "DATAOWNER",
@ -1188,9 +1211,9 @@ def test_pattern_container_and_dataset_ownership_transformation(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 3
assert len(outputs) == len(inputs) + 4
# Check the first entry.
# Check that DatasetSnapshotClass has not changed
assert inputs[0] == outputs[0].record
# Check the ownership for the first dataset (example1)
@ -1217,12 +1240,16 @@ def test_pattern_container_and_dataset_ownership_transformation(
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1 # new for datajob
# Check container ownerships
for i in range(2):
container_ownership_aspect = outputs[i + 4].record.aspect
container_ownership_aspect = outputs[i + 5].record.aspect
assert container_ownership_aspect
ownership = json.loads(container_ownership_aspect.value.decode("utf-8"))
assert len(ownership) == 2
assert len(ownership) == 3
assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1")
assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")