datahub/metadata-ingestion/tests/unit/test_transform_dataset.py

602 lines
19 KiB
Python
Raw Normal View History

from typing import Dict, List, Union
from unittest import mock
import pytest
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.add_dataset_browse_path import (
AddDatasetBrowsePathTransformer,
)
from datahub.ingestion.transformer.add_dataset_ownership import (
AddDatasetOwnership,
PatternAddDatasetOwnership,
SimpleAddDatasetOwnership,
)
from datahub.ingestion.transformer.add_dataset_properties import (
AddDatasetProperties,
AddDatasetPropertiesResolverBase,
)
from datahub.ingestion.transformer.add_dataset_tags import (
AddDatasetTags,
SimpleAddDatasetTags,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.metadata.schema_classes import DatasetSnapshotClass
def make_generic_dataset() -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
],
),
)
def make_dataset_with_owner() -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
EXISTING_PROPERTIES = {"my_existing_property": "existing property value"}
def make_dataset_with_properties() -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES),
],
),
)
def test_simple_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()
with_owner_aspect = make_dataset_with_owner()
not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
]
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
builder.make_user_urn("person2"),
]
},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs)
# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Check the second entry.
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 3
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
def test_simple_dataset_ownership_with_type_transformation(mock_time):
input = make_generic_dataset()
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "PRODUCER",
},
PipelineContext(run_id="test"),
)
output = list(transformer.transform([RecordEnvelope(input, metadata={})]))
assert len(output) == 1
ownership_aspect = builder.get_aspect_if_available(
output[0].record, models.OwnershipClass
)
assert ownership_aspect
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER
def test_simple_dataset_ownership_with_invalid_type_transformation(mock_time):
with pytest.raises(ValueError):
SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "INVALID_TYPE",
},
PipelineContext(run_id="test"),
)
def test_simple_remove_dataset_ownership():
with_owner_aspect = make_dataset_with_owner()
transformer = SimpleRemoveDatasetOwnership.create(
{},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(with_owner_aspect, metadata={})])
)
ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert ownership_aspect
assert len(ownership_aspect.owners) == 0
def test_mark_status_dataset():
dataset = make_generic_dataset()
transformer = MarkDatasetStatus.create(
{"removed": True},
PipelineContext(run_id="test"),
)
removed = list(transformer.transform([RecordEnvelope(dataset, metadata={})]))
status_aspect = builder.get_aspect_if_available(
removed[0].record, models.StatusClass
)
assert status_aspect
assert status_aspect.removed is True
transformer = MarkDatasetStatus.create(
{"removed": False},
PipelineContext(run_id="test"),
)
not_removed = list(transformer.transform([RecordEnvelope(dataset, metadata={})]))
status_aspect = builder.get_aspect_if_available(
not_removed[0].record, models.StatusClass
)
assert status_aspect
assert status_aspect.removed is False
def test_add_dataset_browse_paths():
dataset = make_generic_dataset()
transformer = AddDatasetBrowsePathTransformer.create(
{"path_templates": ["/abc"]},
PipelineContext(run_id="test"),
)
transformed = list(transformer.transform([RecordEnvelope(dataset, metadata={})]))
browse_path_aspect = builder.get_aspect_if_available(
transformed[0].record, models.BrowsePathsClass
)
assert browse_path_aspect
assert browse_path_aspect.paths == ["/abc"]
transformer = AddDatasetBrowsePathTransformer.create(
{
"path_templates": [
"/PLATFORM/foo/DATASET_PARTS/ENV",
"/ENV/PLATFORM/bar/DATASET_PARTS/",
]
},
PipelineContext(run_id="test"),
)
transformed = list(transformer.transform([RecordEnvelope(dataset, metadata={})]))
browse_path_aspect = builder.get_aspect_if_available(
transformed[0].record, models.BrowsePathsClass
)
assert browse_path_aspect
assert browse_path_aspect.paths == [
"/abc",
"/bigquery/foo/example1/prod",
"/prod/bigquery/bar/example1/",
]
transformer = AddDatasetBrowsePathTransformer.create(
{
"path_templates": [
"/xyz",
],
"replace_existing": True,
},
PipelineContext(run_id="test"),
)
transformed = list(transformer.transform([RecordEnvelope(dataset, metadata={})]))
browse_path_aspect = builder.get_aspect_if_available(
transformed[0].record, models.BrowsePathsClass
)
assert browse_path_aspect
assert browse_path_aspect.paths == [
"/xyz",
]
def test_simple_dataset_tags_transformation(mock_time):
dataset_mce = make_generic_dataset()
transformer = SimpleAddDatasetTags.create(
{
"tag_urns": [
builder.make_tag_urn("NeedsDocumentation"),
builder.make_tag_urn("Legacy"),
]
},
PipelineContext(run_id="test-tags"),
)
outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [dataset_mce]]
)
)
assert len(outputs) == 1
# Check that tags were added.
tags_aspect = builder.get_aspect_if_available(
outputs[0].record, models.GlobalTagsClass
)
assert tags_aspect
assert len(tags_aspect.tags) == 2
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation")
def dummy_tag_resolver_method(dataset_snapshot):
return []
def test_import_resolver():
transformer = AddDatasetTags.create(
{
"get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method"
},
PipelineContext(run_id="test-tags"),
)
output = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [make_generic_dataset()]]
)
)
assert output
def test_pattern_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()
with_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
]
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
}
},
},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs)
# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Check the second entry.
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
def test_pattern_dataset_ownership_with_type_transformation(mock_time):
input = make_generic_dataset()
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
}
},
"ownership_type": "PRODUCER",
},
PipelineContext(run_id="test"),
)
output = list(transformer.transform([RecordEnvelope(input, metadata={})]))
assert len(output) == 1
ownership_aspect = builder.get_aspect_if_available(
output[0].record, models.OwnershipClass
)
assert ownership_aspect
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER
def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time):
with pytest.raises(ValueError):
PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
}
},
"ownership_type": "INVALID_TYPE",
},
PipelineContext(run_id="test"),
)
def gen_owners(
owners: List[str],
ownership_type: Union[
str, models.OwnershipTypeClass
] = models.OwnershipTypeClass.DATAOWNER,
) -> models.OwnershipClass:
return models.OwnershipClass(
owners=[models.OwnerClass(owner=owner, type=ownership_type) for owner in owners]
)
def test_ownership_patching_intersect(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["foo", "bar"])
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "bar" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
def test_ownership_patching_with_nones(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
server_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", None
)
assert not test_ownership
def test_ownership_patching_with_empty_mce_none_server(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_empty_mce_nonempty_server(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"])
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_different_types_1(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.PRODUCER) in [
(o.owner, o.type) for o in test_ownership.owners
]
def test_ownership_patching_with_different_types_2(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_ownership_to_set(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert len(test_ownership.owners) == 2
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
PROPERTIES_TO_ADD = {"my_new_property": "property value"}
class DummyPropertiesResolverClass(AddDatasetPropertiesResolverBase):
def get_properties_to_add(self, current: DatasetSnapshotClass) -> Dict[str, str]:
return PROPERTIES_TO_ADD
def test_add_dataset_properties(mock_time):
dataset_mce = make_dataset_with_properties()
transformer = AddDatasetProperties.create(
{
"add_properties_resolver_class": "tests.unit.test_transform_dataset.DummyPropertiesResolverClass"
},
PipelineContext(run_id="test-properties"),
)
outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [dataset_mce]]
)
)
assert len(outputs) == 1
custom_properties = builder.get_aspect_if_available(
outputs[0].record, models.DatasetPropertiesClass
)
assert custom_properties is not None
assert custom_properties.customProperties == {
**EXISTING_PROPERTIES,
**PROPERTIES_TO_ADD,
}