datahub/metadata-ingestion/tests/unit/test_transform_dataset.py

4994 lines
173 KiB
Python

import json
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, MutableSequence, Optional, Type, Union, cast
from unittest import mock
from uuid import uuid4
import pytest
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
import tests.test_helpers.mce_helpers
from datahub.configuration.common import TransformerSemantics
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api import workunit
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.transformer.add_dataset_browse_path import (
AddDatasetBrowsePathTransformer,
)
from datahub.ingestion.transformer.add_dataset_dataproduct import (
AddDatasetDataProduct,
PatternAddDatasetDataProduct,
SimpleAddDatasetDataProduct,
)
from datahub.ingestion.transformer.add_dataset_ownership import (
AddDatasetOwnership,
PatternAddDatasetOwnership,
SimpleAddDatasetOwnership,
)
from datahub.ingestion.transformer.add_dataset_properties import (
AddDatasetProperties,
AddDatasetPropertiesResolverBase,
SimpleAddDatasetProperties,
)
from datahub.ingestion.transformer.add_dataset_schema_tags import (
PatternAddDatasetSchemaTags,
)
from datahub.ingestion.transformer.add_dataset_schema_terms import (
PatternAddDatasetSchemaTerms,
)
from datahub.ingestion.transformer.add_dataset_tags import (
AddDatasetTags,
PatternAddDatasetTags,
SimpleAddDatasetTags,
)
from datahub.ingestion.transformer.add_dataset_terms import (
PatternAddDatasetTerms,
SimpleAddDatasetTerms,
)
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
)
from datahub.ingestion.transformer.dataset_domain import (
PatternAddDatasetDomain,
SimpleAddDatasetDomain,
TransformerOnConflict,
)
from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
DatasetTagDomainMapper,
)
from datahub.ingestion.transformer.dataset_transformer import (
ContainerTransformer,
DatasetTransformer,
TagTransformer,
)
from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags
from datahub.ingestion.transformer.extract_ownership_from_tags import (
ExtractOwnersFromTagsTransformer,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user import (
PatternCleanupDatasetUsageUser,
)
from datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.ingestion.transformer.replace_external_url import (
ReplaceExternalUrlContainer,
ReplaceExternalUrlDataset,
)
from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
DatasetUserUsageCountsClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnershipClass,
OwnershipTypeClass,
StatusClass,
TagAssociationClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn
def make_generic_dataset(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects: Optional[List[Any]] = None,
) -> models.MetadataChangeEventClass:
if aspects is None:
# Default to a status aspect if none is provided.
aspects = [models.StatusClass(removed=False)]
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn=entity_urn,
aspects=aspects,
),
)
def make_generic_dataset_mcp(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspect: Any = models.StatusClass(removed=False),
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=aspect,
)
def make_generic_container_mcp(
entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee",
aspect_name: str = "status",
aspect: Any = None,
) -> MetadataChangeProposalWrapper:
if aspect is None:
aspect = models.StatusClass(removed=False)
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.from_string(entity_urn).entity_type,
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)
def create_and_run_test_pipeline(
events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]],
transformers: List[Dict[str, Any]],
path: str,
) -> str:
with mock.patch(
"tests.unit.test_source.FakeSource.get_workunits"
) as mock_getworkunits:
mock_getworkunits.return_value = [
(
workunit.MetadataWorkUnit(
id=f"test-workunit-mce-{e.proposedSnapshot.urn}", mce=e
)
if isinstance(e, MetadataChangeEventClass)
else workunit.MetadataWorkUnit(
id=f"test-workunit-mcp-{e.entityUrn}-{e.aspectName}", mcp=e
)
)
for e in events
]
events_file = f"{path}/{str(uuid4())}.json"
pipeline = Pipeline.create(
config_dict={
"source": {
"type": "tests.unit.test_source.FakeSource",
"config": {},
},
"transformers": transformers,
"sink": {"type": "file", "config": {"filename": events_file}},
}
)
pipeline.run()
pipeline.raise_from_status()
return events_file
def make_dataset_with_owner() -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
EXISTING_PROPERTIES = {"my_existing_property": "existing property value"}
def make_dataset_with_properties() -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
models.DatasetPropertiesClass(
customProperties=EXISTING_PROPERTIES.copy()
),
],
),
)
def test_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()
with_owner_aspect = make_dataset_with_owner()
not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()]
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
builder.make_user_urn("person2"),
]
},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 2
# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert first_ownership_aspect is None
last_event = outputs[3].record
assert isinstance(last_event, MetadataChangeProposalWrapper)
assert isinstance(last_event.aspect, OwnershipClass)
assert len(last_event.aspect.owners) == 2
assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in last_event.aspect.owners
]
)
# Check the second entry.
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 3
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
# Verify that the last entry is EndOfStream
assert inputs[-1] == outputs[-1].record
def test_simple_dataset_ownership_with_type_transformation(mock_time):
input = make_generic_dataset()
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "PRODUCER",
},
PipelineContext(run_id="test"),
)
output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(output) == 3
# original MCE is unchanged
assert input == output[0].record
ownership_aspect = output[1].record.aspect
assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER
def test_simple_dataset_ownership_with_type_urn_transformation(mock_time):
input = make_generic_dataset()
transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
PipelineContext(run_id="test"),
)
output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(output) == 3
# original MCE is unchanged
assert input == output[0].record
ownership_aspect = output[1].record.aspect
assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM
assert (
ownership_aspect.owners[0].typeUrn
== "urn:li:ownershipType:__system__technical_owner"
)
def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None:
input = make_generic_dataset(entity_urn=in_urn)
transformer = ExtractDatasetTags.create(
{
"extract_tags_from": "urn",
"extract_tags_regex": regex_str,
"semantics": "overwrite",
},
PipelineContext(run_id="test"),
)
output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(output) == 3
assert output[0].record == input
tags_aspect = output[1].record.aspect
assert isinstance(tags_aspect, GlobalTagsClass)
assert len(tags_aspect.tags) == 1
assert tags_aspect.tags[0].tag == out_tag
def test_extract_dataset_tags(mock_time):
_test_extract_tags(
in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.part1-part2-part3_part4,PROD)",
regex_str="(.*)",
out_tag="urn:li:tag:clusterid.part1-part2-part3_part4",
)
_test_extract_tags(
in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.USA-ops-team_table1,PROD)",
regex_str=".([^._]*)_",
out_tag="urn:li:tag:USA-ops-team",
)
_test_extract_tags(
in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.Canada-marketing_table1,PROD)",
regex_str=".([^._]*)_",
out_tag="urn:li:tag:Canada-marketing",
)
_test_extract_tags(
in_urn="urn:li:dataset:(urn:li:dataPlatform:elasticsearch,abcdef-prefix_datahub_usage_event-000027,PROD)",
regex_str="([^._]*)_",
out_tag="urn:li:tag:abcdef-prefix",
)
def test_simple_dataset_ownership_with_invalid_type_transformation(mock_time):
with pytest.raises(ValueError):
SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "INVALID_TYPE",
},
PipelineContext(run_id="test"),
)
def test_simple_remove_dataset_ownership():
with_owner_aspect = make_dataset_with_owner()
transformer = SimpleRemoveDatasetOwnership.create(
{},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(with_owner_aspect, metadata={})])
)
ownership_aspect = builder.get_aspect_if_available(
outputs[0].record, models.OwnershipClass
)
assert ownership_aspect
assert len(ownership_aspect.owners) == 0
def test_mark_status_dataset(tmp_path):
dataset = make_generic_dataset()
transformer = MarkDatasetStatus.create(
{"removed": True},
PipelineContext(run_id="test"),
)
removed = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
]
)
)
assert len(removed) == 1
status_aspect = builder.get_aspect_if_available(
removed[0].record, models.StatusClass
)
assert status_aspect
assert status_aspect.removed is True
transformer = MarkDatasetStatus.create(
{"removed": False},
PipelineContext(run_id="test"),
)
not_removed = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
]
)
)
assert len(not_removed) == 1
status_aspect = builder.get_aspect_if_available(
not_removed[0].record, models.StatusClass
)
assert status_aspect
assert status_aspect.removed is False
mcp = make_generic_dataset_mcp(
aspect=DatasetPropertiesClass(description="Test dataset"),
)
events_file = create_and_run_test_pipeline(
events=[mcp],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
)
# assert dataset properties aspect was preserved
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="datasetProperties",
aspect_field_matcher={"description": "Test dataset"},
file=events_file,
)
== 1
)
# assert Status aspect was generated
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="status",
aspect_field_matcher={"removed": True},
file=events_file,
)
== 1
)
# MCE only
test_aspect = DatasetPropertiesClass(description="Test dataset")
events_file = create_and_run_test_pipeline(
events=[make_generic_dataset(aspects=[test_aspect])],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
)
# assert dataset properties aspect was preserved
assert (
tests.test_helpers.mce_helpers.assert_entity_mce_aspect(
entity_urn=mcp.entityUrn or "",
aspect=test_aspect,
aspect_type=DatasetPropertiesClass,
file=events_file,
)
== 1
)
# assert Status aspect was generated
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="status",
aspect_field_matcher={"removed": True},
file=events_file,
)
== 1
)
# MCE (non-matching) + MCP (matching)
test_aspect = DatasetPropertiesClass(description="Test dataset")
events_file = create_and_run_test_pipeline(
events=[
make_generic_dataset(aspects=[test_aspect]),
make_generic_dataset_mcp(),
],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
)
# assert dataset properties aspect was preserved
assert (
tests.test_helpers.mce_helpers.assert_entity_mce_aspect(
entity_urn=mcp.entityUrn or "",
aspect=test_aspect,
aspect_type=DatasetPropertiesClass,
file=events_file,
)
== 1
)
# assert Status aspect was generated
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="status",
aspect_field_matcher={"removed": True},
file=events_file,
)
== 1
)
# MCE (matching) + MCP (non-matching)
test_status_aspect = StatusClass(removed=False)
events_file = create_and_run_test_pipeline(
events=[
make_generic_dataset(aspects=[test_status_aspect]),
make_generic_dataset_mcp(
aspect=DatasetPropertiesClass(description="test dataset"),
),
],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
)
# assert MCE was transformed
assert (
tests.test_helpers.mce_helpers.assert_entity_mce_aspect(
entity_urn=mcp.entityUrn or "",
aspect=StatusClass(removed=True),
aspect_type=StatusClass,
file=events_file,
)
== 1
)
# assert MCP aspect was preserved
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="datasetProperties",
aspect_field_matcher={"description": "test dataset"},
file=events_file,
)
== 1
)
# MCE (non-matching) + MCP (non-matching)
test_mcp_aspect = GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:test")])
test_dataset_props_aspect = DatasetPropertiesClass(description="Test dataset")
events_file = create_and_run_test_pipeline(
events=[
make_generic_dataset(aspects=[test_dataset_props_aspect]),
make_generic_dataset_mcp(aspect=test_mcp_aspect),
],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
)
# assert MCE was preserved
assert (
tests.test_helpers.mce_helpers.assert_entity_mce_aspect(
entity_urn=mcp.entityUrn or "",
aspect=test_dataset_props_aspect,
aspect_type=DatasetPropertiesClass,
file=events_file,
)
== 1
)
# assert MCP aspect was preserved
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="globalTags",
aspect_field_matcher={"tags": [{"tag": "urn:li:tag:test"}]},
file=events_file,
)
== 1
)
# assert MCP Status aspect was generated
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="status",
aspect_field_matcher={"removed": True},
file=events_file,
)
== 1
)
def test_extract_owners_from_tags():
def _test_owner(
tag: str,
config: Dict,
expected_owner: str,
expected_owner_type: Optional[str] = None,
expected_owner_type_urn: Optional[str] = None,
) -> None:
dataset = make_generic_dataset(
aspects=[
models.GlobalTagsClass(
tags=[TagAssociationClass(tag=builder.make_tag_urn(tag))]
)
]
)
transformer = ExtractOwnersFromTagsTransformer.create(
config,
PipelineContext(run_id="test"),
)
record_envelops: List[RecordEnvelope] = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
RecordEnvelope(record=EndOfStream(), metadata={}),
]
)
)
assert len(record_envelops) == 3
mcp: MetadataChangeProposalWrapper = record_envelops[1].record
owners_aspect = cast(OwnershipClass, mcp.aspect)
owners = owners_aspect.owners
owner = owners[0]
assert expected_owner_type is not None
assert owner.type == expected_owner_type
assert owner.owner == expected_owner
assert owner.typeUrn == expected_owner_type_urn
_test_owner(
tag="owner:foo",
config={
"tag_prefix": "owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="abcdef-owner:foo",
config={
"tag_prefix": ".*owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
config={
"tag_prefix": "owner:",
"is_user": False,
},
expected_owner="urn:li:corpGroup:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
config={
"tag_prefix": "owner:",
"email_domain": "example.com",
},
expected_owner="urn:li:corpuser:foo@example.com",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
config={
"tag_prefix": "owner:",
"email_domain": "example.com",
"owner_type": "TECHNICAL_OWNER",
},
expected_owner="urn:li:corpuser:foo@example.com",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
config={
"tag_prefix": "owner:",
"email_domain": "example.com",
"owner_type": "AUTHOR",
"owner_type_urn": "urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f",
},
expected_owner="urn:li:corpuser:foo@example.com",
expected_owner_type=OwnershipTypeClass.CUSTOM,
expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f",
)
_test_owner(
tag="data__producer__owner__email:abc--xyz-email_com",
config={
"tag_pattern": "(.*)_owner_email:",
"tag_character_mapping": {
"_": ".",
"-": "@",
"__": "_",
"--": "-",
},
"extract_owner_type_from_tag_pattern": True,
},
expected_owner="urn:li:corpuser:abc-xyz@email.com",
expected_owner_type=OwnershipTypeClass.CUSTOM,
expected_owner_type_urn="urn:li:ownershipType:data_producer",
)
def test_add_dataset_browse_paths():
dataset = make_generic_dataset()
transformer = AddDatasetBrowsePathTransformer.create(
{"path_templates": ["/abc"]},
PipelineContext(run_id="test"),
)
transformed = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
browse_path_aspect = transformed[1].record.aspect
assert browse_path_aspect
assert browse_path_aspect.paths == ["/abc"]
# use an mce with a pre-existing browse path
dataset_mce = make_generic_dataset(
aspects=[StatusClass(removed=False), browse_path_aspect]
)
transformer = AddDatasetBrowsePathTransformer.create(
{
"path_templates": [
"/PLATFORM/foo/DATASET_PARTS/ENV",
"/ENV/PLATFORM/bar/DATASET_PARTS/",
]
},
PipelineContext(run_id="test"),
)
transformed = list(
transformer.transform(
[
RecordEnvelope(dataset_mce, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(transformed) == 2
browse_path_aspect = builder.get_aspect_if_available(
transformed[0].record, BrowsePathsClass
)
assert browse_path_aspect
assert browse_path_aspect.paths == [
"/abc",
"/bigquery/foo/example1/prod",
"/prod/bigquery/bar/example1/",
]
transformer = AddDatasetBrowsePathTransformer.create(
{
"path_templates": [
"/xyz",
],
"replace_existing": True,
},
PipelineContext(run_id="test"),
)
transformed = list(
transformer.transform(
[
RecordEnvelope(dataset_mce, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(transformed) == 2
browse_path_aspect = builder.get_aspect_if_available(
transformed[0].record, BrowsePathsClass
)
assert browse_path_aspect
assert browse_path_aspect.paths == [
"/xyz",
]
def test_simple_dataset_tags_transformation(mock_time):
dataset_mce = make_generic_dataset()
transformer = SimpleAddDatasetTags.create(
{
"tag_urns": [
builder.make_tag_urn("NeedsDocumentation"),
builder.make_tag_urn("Legacy"),
]
},
PipelineContext(run_id="test-tags"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 5
# Check that tags were added.
tags_aspect = outputs[1].record.aspect
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation")
assert tags_aspect
assert len(tags_aspect.tags) == 2
# Check new tag entity should be there
assert outputs[2].record.aspectName == "tagKey"
assert outputs[2].record.aspect.name == "NeedsDocumentation"
assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation")
assert outputs[3].record.aspectName == "tagKey"
assert outputs[3].record.aspect.name == "Legacy"
assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy")
assert isinstance(outputs[4].record, EndOfStream)
def dummy_tag_resolver_method(dataset_snapshot):
return []
def test_pattern_dataset_tags_transformation(mock_time):
dataset_mce = make_generic_dataset()
transformer = PatternAddDatasetTags.create(
{
"tag_pattern": {
"rules": {
".*example1.*": [
builder.make_tag_urn("Private"),
builder.make_tag_urn("Legacy"),
],
".*example2.*": [builder.make_term_urn("Needs Documentation")],
}
},
},
PipelineContext(run_id="test-tags"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 5
tags_aspect = outputs[1].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 2
assert tags_aspect.tags[0].tag == builder.make_tag_urn("Private")
assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags
def test_add_dataset_tags_transformation():
transformer = AddDatasetTags.create(
{
"get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method"
},
PipelineContext(run_id="test-tags"),
)
output = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [make_generic_dataset()]]
)
)
assert output
def test_pattern_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()
with_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()]
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
},
PipelineContext(run_id="test"),
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert (
len(outputs) == len(inputs) + 2
) # additional MCP due to the no-owner MCE + datajob
# Check the first entry.
assert inputs[0] == outputs[0].record
first_ownership_aspect = outputs[3].record.aspect
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Check the second entry.
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in second_ownership_aspect.owners
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in third_ownership_aspect.owners
]
)
# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record
# Verify that the last entry is unchanged (EOS)
assert inputs[-1] == outputs[-1].record
def test_pattern_dataset_ownership_with_type_transformation(mock_time):
input = make_generic_dataset()
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
}
},
"ownership_type": "PRODUCER",
},
PipelineContext(run_id="test"),
)
output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)
assert len(output) == 3
ownership_aspect = output[1].record.aspect
assert ownership_aspect
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER
def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time):
with pytest.raises(ValueError):
PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
}
},
"ownership_type": "INVALID_TYPE",
},
PipelineContext(run_id="test"),
)
def test_pattern_container_and_dataset_ownership_transformation(
mock_time, mock_datahub_graph_instance
):
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> Optional[models.BrowsePathsV2Class]:
return models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
),
models.BrowsePathEntryClass(
id="container_2", urn="urn:li:container:container_2"
),
]
)
pipeline_context = PipelineContext(
run_id="test_pattern_container_and_dataset_ownership_transformation"
)
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
# No owner aspect for the first dataset
no_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[models.StatusClass(removed=False)],
),
)
# Dataset with an existing owner
with_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
datajob = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [
no_owner_aspect_dataset,
with_owner_aspect_dataset,
datajob,
EndOfStream(),
]
# Initialize the transformer with container support
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person3")],
}
},
"ownership_type": "DATAOWNER",
"is_container": True, # Enable container ownership handling
},
pipeline_context,
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 4
# Check that DatasetSnapshotClass has not changed
assert inputs[0] == outputs[0].record
# Check the ownership for the first dataset (example1)
first_ownership_aspect = outputs[3].record.aspect
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Check the ownership for the second dataset (example2)
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 2 # One existing + one new
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in second_ownership_aspect.owners
]
)
third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1 # new for datajob
# Check container ownerships
for i in range(2):
container_ownership_aspect = outputs[i + 5].record.aspect
assert container_ownership_aspect
ownership = json.loads(container_ownership_aspect.value.decode("utf-8"))
assert len(ownership) == 3
assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1")
assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")
# Verify that the third input (not a dataset) is unchanged
assert inputs[2] == outputs[2].record
def test_pattern_container_and_dataset_ownership_with_no_container(
mock_time, mock_datahub_graph_instance
):
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> Optional[models.BrowsePathsV2Class]:
return None
pipeline_context = PipelineContext(
run_id="test_pattern_container_and_dataset_ownership_with_no_container"
)
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
# No owner aspect for the first dataset
no_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
),
models.BrowsePathEntryClass(
id="container_2", urn="urn:li:container:container_2"
),
]
),
],
),
)
# Dataset with an existing owner
with_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
),
models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
),
models.BrowsePathEntryClass(
id="container_2", urn="urn:li:container:container_2"
),
]
),
],
),
)
inputs = [
no_owner_aspect,
with_owner_aspect,
EndOfStream(),
]
# Initialize the transformer with container support
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
"is_container": True, # Enable container ownership handling
},
pipeline_context,
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 1
# Check the ownership for the first dataset (example1)
first_ownership_aspect = outputs[2].record.aspect
assert first_ownership_aspect
assert len(first_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in first_ownership_aspect.owners
]
)
# Check the ownership for the second dataset (example2)
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 2 # One existing + one new
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in second_ownership_aspect.owners
]
)
def test_pattern_container_and_dataset_ownership_with_no_match(
mock_time, mock_datahub_graph_instance
):
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> models.BrowsePathsV2Class:
return models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
)
]
)
pipeline_context = PipelineContext(
run_id="test_pattern_container_and_dataset_ownership_with_no_match"
)
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
# No owner aspect for the first dataset
no_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
],
),
)
# Dataset with an existing owner
with_owner_aspect = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
models.OwnershipClass(
owners=[
models.OwnerClass(
owner=builder.make_user_urn("fake_owner"),
type=models.OwnershipTypeClass.DATAOWNER,
),
],
lastModified=models.AuditStampClass(
time=1625266033123, actor="urn:li:corpuser:datahub"
),
)
],
),
)
inputs = [
no_owner_aspect,
with_owner_aspect,
EndOfStream(),
]
# Initialize the transformer with container support
transformer = PatternAddDatasetOwnership.create(
{
"owner_pattern": {
"rules": {
".*example3.*": [builder.make_user_urn("person1")],
".*example4.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
"is_container": True, # Enable container ownership handling
},
pipeline_context,
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 1
# Check the ownership for the first dataset (example1)
first_ownership_aspect = outputs[2].record.aspect
assert first_ownership_aspect
assert builder.make_user_urn("person1") not in first_ownership_aspect.owners
assert builder.make_user_urn("person2") not in first_ownership_aspect.owners
# Check the ownership for the second dataset (example2)
second_ownership_aspect = builder.get_aspect_if_available(
outputs[1].record, models.OwnershipClass
)
assert second_ownership_aspect
assert len(second_ownership_aspect.owners) == 1
assert builder.make_user_urn("person1") not in second_ownership_aspect.owners
assert builder.make_user_urn("person2") not in second_ownership_aspect.owners
assert (
builder.make_user_urn("fake_owner") == second_ownership_aspect.owners[0].owner
)
def gen_owners(
owners: List[str],
ownership_type: Union[
str, models.OwnershipTypeClass
] = models.OwnershipTypeClass.DATAOWNER,
) -> models.OwnershipClass:
return models.OwnershipClass(
owners=[models.OwnerClass(owner=owner, type=ownership_type) for owner in owners]
)
def test_ownership_patching_intersect(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["foo", "bar"])
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "bar" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
def test_ownership_patching_with_nones(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert "foo" in [o.owner for o in test_ownership.owners]
assert "baz" in [o.owner for o in test_ownership.owners]
server_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", None
)
assert not test_ownership
def test_ownership_patching_with_empty_mce_none_server(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_empty_mce_nonempty_server(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"])
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
assert test_ownership is None
def test_ownership_patching_with_different_types_1(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.PRODUCER) in [
(o.owner, o.type) for o in test_ownership.owners
]
def test_ownership_patching_with_different_types_2(mock_time):
mock_graph = mock.MagicMock()
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
assert len(test_ownership.owners) == 2
# nothing to add, so we omit writing
assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
assert ("baz", models.OwnershipTypeClass.DATAOWNER) in [
(o.owner, o.type) for o in test_ownership.owners
]
PROPERTIES_TO_ADD = {"my_new_property": "property value"}
class DummyPropertiesResolverClass(AddDatasetPropertiesResolverBase):
def get_properties_to_add(self, entity_urn: str) -> Dict[str, str]:
return PROPERTIES_TO_ADD
def test_add_dataset_properties(mock_time):
dataset_mce = make_dataset_with_properties()
transformer = AddDatasetProperties.create(
{
"add_properties_resolver_class": "tests.unit.test_transform_dataset.DummyPropertiesResolverClass"
},
PipelineContext(run_id="test-properties"),
)
outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [dataset_mce]]
)
)
assert len(outputs) == 1
custom_properties = builder.get_aspect_if_available(
outputs[0].record, models.DatasetPropertiesClass
)
assert custom_properties is not None
assert custom_properties.customProperties == {
**EXISTING_PROPERTIES,
**PROPERTIES_TO_ADD,
}
def run_simple_add_dataset_properties_transformer_semantics(
semantics: TransformerSemantics,
new_properties: dict,
server_properties: dict,
mock_datahub_graph_instance: DataHubGraph,
) -> List[RecordEnvelope]:
pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms")
pipeline_context.graph = mock_datahub_graph_instance
# fake the server response
def fake_dataset_properties(entity_urn: str) -> models.DatasetPropertiesClass:
return DatasetPropertiesClass(customProperties=server_properties)
pipeline_context.graph.get_dataset_properties = fake_dataset_properties # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetProperties,
pipeline_context=pipeline_context,
aspect=models.DatasetPropertiesClass(
customProperties=EXISTING_PROPERTIES.copy()
),
config={
"semantics": semantics,
"properties": new_properties,
},
)
return output
def test_simple_add_dataset_properties_overwrite(mock_datahub_graph_instance):
new_properties = {"new-simple-property": "new-value"}
server_properties = {"p1": "value1"}
output = run_simple_add_dataset_properties_transformer_semantics(
semantics=TransformerSemantics.OVERWRITE,
new_properties=new_properties,
server_properties=server_properties,
mock_datahub_graph_instance=mock_datahub_graph_instance,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
custom_properties_aspect: models.DatasetPropertiesClass = cast(
models.DatasetPropertiesClass, output[0].record.aspect
)
assert custom_properties_aspect.customProperties == {
**EXISTING_PROPERTIES,
**new_properties,
}
def test_simple_add_dataset_properties_patch(mock_datahub_graph_instance):
new_properties = {"new-simple-property": "new-value"}
server_properties = {"p1": "value1"}
output = run_simple_add_dataset_properties_transformer_semantics(
semantics=TransformerSemantics.PATCH,
new_properties=new_properties,
server_properties=server_properties,
mock_datahub_graph_instance=mock_datahub_graph_instance,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
custom_properties_aspect: models.DatasetPropertiesClass = cast(
models.DatasetPropertiesClass, output[0].record.aspect
)
assert custom_properties_aspect.customProperties == {
**EXISTING_PROPERTIES,
**new_properties,
**server_properties,
}
def test_simple_add_dataset_properties(mock_time):
new_properties = {"new-simple-property": "new-value"}
outputs = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetProperties,
aspect=models.DatasetPropertiesClass(
customProperties=EXISTING_PROPERTIES.copy()
),
config={
"properties": new_properties,
},
)
assert len(outputs) == 2
assert outputs[0].record
assert outputs[0].record.aspect
custom_properties_aspect: models.DatasetPropertiesClass = cast(
models.DatasetPropertiesClass, outputs[0].record.aspect
)
assert custom_properties_aspect.customProperties == {
**EXISTING_PROPERTIES,
**new_properties,
}
def test_simple_add_dataset_properties_replace_existing(mock_time):
new_properties = {"new-simple-property": "new-value"}
outputs = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetProperties,
aspect=models.DatasetPropertiesClass(
customProperties=EXISTING_PROPERTIES.copy()
),
config={
"replace_existing": True,
"properties": new_properties,
},
)
assert len(outputs) == 2
assert outputs[0].record
assert outputs[0].record.aspect
custom_properties_aspect: models.DatasetPropertiesClass = cast(
models.DatasetPropertiesClass, outputs[0].record.aspect
)
assert custom_properties_aspect.customProperties == {
**new_properties,
}
def test_simple_dataset_terms_transformation(mock_time):
dataset_mce = make_generic_dataset()
transformer = SimpleAddDatasetTerms.create(
{
"term_urns": [
builder.make_term_urn("Test"),
builder.make_term_urn("Needs Review"),
]
},
PipelineContext(run_id="test-terms"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 3
# Check that glossary terms were added.
terms_aspect = outputs[1].record.aspect
assert terms_aspect
assert len(terms_aspect.terms) == 2
assert terms_aspect.terms[0].urn == builder.make_term_urn("Test")
def test_pattern_dataset_terms_transformation(mock_time):
dataset_mce = make_generic_dataset()
transformer = PatternAddDatasetTerms.create(
{
"term_pattern": {
"rules": {
".*example1.*": [
builder.make_term_urn("AccountBalance"),
builder.make_term_urn("Email"),
],
".*example2.*": [builder.make_term_urn("Address")],
}
},
},
PipelineContext(run_id="test-terms"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 3
# Check that glossary terms were added.
terms_aspect = outputs[1].record.aspect
assert terms_aspect
assert len(terms_aspect.terms) == 2
assert terms_aspect.terms[0].urn == builder.make_term_urn("AccountBalance")
assert builder.make_term_urn("AccountBalance") not in terms_aspect.terms
def test_mcp_add_tags_missing(mock_time):
dataset_mcp = make_generic_dataset_mcp()
transformer = SimpleAddDatasetTags.create(
{
"tag_urns": [
builder.make_tag_urn("NeedsDocumentation"),
builder.make_tag_urn("Legacy"),
]
},
PipelineContext(run_id="test-tags"),
)
input_stream: List[RecordEnvelope] = [
RecordEnvelope(input, metadata={}) for input in [dataset_mcp]
]
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={}))
outputs = list(transformer.transform(input_stream))
assert len(outputs) == 5
assert outputs[0].record == dataset_mcp
# Check that tags were added, this will be the second result
tags_aspect = outputs[1].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 2
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation")
assert isinstance(outputs[-1].record, EndOfStream)
def test_mcp_add_tags_existing(mock_time):
dataset_mcp = make_generic_dataset_mcp(
aspect=GlobalTagsClass(
tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))]
),
)
transformer = SimpleAddDatasetTags.create(
{
"tag_urns": [
builder.make_tag_urn("NeedsDocumentation"),
builder.make_tag_urn("Legacy"),
]
},
PipelineContext(run_id="test-tags"),
)
input_stream: List[RecordEnvelope] = [
RecordEnvelope(input, metadata={}) for input in [dataset_mcp]
]
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={}))
outputs = list(transformer.transform(input_stream))
assert len(outputs) == 4
# Check that tags were added, this will be the second result
tags_aspect = outputs[0].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 3
assert tags_aspect.tags[0].tag == builder.make_tag_urn("Test")
assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation")
assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy")
# Check tag entities got added
assert outputs[1].record.entityType == "tag"
assert outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation")
assert outputs[2].record.entityType == "tag"
assert outputs[2].record.entityUrn == builder.make_tag_urn("Legacy")
assert isinstance(outputs[-1].record, EndOfStream)
def test_mcp_multiple_transformers(mock_time, tmp_path):
events_file = f"{tmp_path}/multi_transformer_test.json"
pipeline = Pipeline.create(
config_dict={
"source": {
"type": "tests.unit.test_source.FakeSource",
"config": {},
},
"transformers": [
{
"type": "set_dataset_browse_path",
"config": {
"path_templates": ["/ENV/PLATFORM/EsComments/DATASET_PARTS"]
},
},
{
"type": "simple_add_dataset_tags",
"config": {"tag_urns": ["urn:li:tag:EsComments"]},
},
],
"sink": {"type": "file", "config": {"filename": events_file}},
}
)
pipeline.run()
pipeline.raise_from_status()
urn_pattern = "^" + re.escape(
"urn:li:dataset:(urn:li:dataPlatform:elasticsearch,fooIndex,PROD)"
)
assert (
tests.test_helpers.mce_helpers.assert_mcp_entity_urn(
filter="ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
== 3
)
# check on status aspect
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="status",
aspect_field_matcher={"removed": False},
file=events_file,
)
== 1
)
# check on globalTags aspect
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="globalTags",
aspect_field_matcher={"tags": [{"tag": "urn:li:tag:EsComments"}]},
file=events_file,
)
== 1
)
# check on globalTags aspect
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="browsePaths",
aspect_field_matcher={"paths": ["/prod/elasticsearch/EsComments/fooIndex"]},
file=events_file,
)
== 1
)
def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
mcps: MutableSequence[
Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
] = [
MetadataChangeProposalWrapper(
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name=f"fooBarIndex{i}",
env="PROD",
)
),
aspect=GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:Test")]),
)
for i in range(0, 10)
]
mcps.extend(
[
MetadataChangeProposalWrapper(
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name=f"fooBarIndex{i}",
env="PROD",
)
),
aspect=DatasetPropertiesClass(description="test dataset"),
)
for i in range(0, 10)
]
)
# shuffle the mcps
import random
random.shuffle(mcps)
events_file = create_and_run_test_pipeline(
events=list(mcps),
transformers=[
{
"type": "set_dataset_browse_path",
"config": {
"path_templates": ["/ENV/PLATFORM/EsComments/DATASET_PARTS"]
},
},
{
"type": "simple_add_dataset_tags",
"config": {"tag_urns": ["urn:li:tag:EsComments"]},
},
],
path=tmp_path,
)
urn_pattern = "^" + re.escape(
"urn:li:dataset:(urn:li:dataPlatform:elasticsearch,fooBarIndex"
)
# there should be 30 MCP-s
assert (
tests.test_helpers.mce_helpers.assert_mcp_entity_urn(
filter="ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=events_file,
)
== 30
)
# 10 globalTags aspects with new tag attached
assert (
tests.test_helpers.mce_helpers.assert_for_each_entity(
entity_type="dataset",
aspect_name="globalTags",
aspect_field_matcher={
"tags": [{"tag": "urn:li:tag:Test"}, {"tag": "urn:li:tag:EsComments"}]
},
file=events_file,
)
== 10
)
# check on browsePaths aspect
for i in range(0, 10):
assert (
tests.test_helpers.mce_helpers.assert_entity_mcp_aspect(
entity_urn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name=f"fooBarIndex{i}",
env="PROD",
)
),
aspect_name="browsePaths",
aspect_field_matcher={
"paths": [f"/prod/elasticsearch/EsComments/fooBarIndex{i}"]
},
file=events_file,
)
== 1
)
class SuppressingTransformer(BaseTransformer, SingleAspectTransformer):
@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "SuppressingTransformer":
return SuppressingTransformer()
def entity_types(self) -> List[str]:
return super().entity_types()
def aspect_name(self) -> str:
return "datasetProperties"
def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect]
) -> Optional[builder.Aspect]:
return None
def test_supression_works():
dataset_mce = make_generic_dataset()
dataset_mcp = make_generic_dataset_mcp(
aspect=DatasetPropertiesClass(description="supressable description"),
)
transformer = SuppressingTransformer.create(
{},
PipelineContext(run_id="test-suppress-transformer"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, dataset_mcp, EndOfStream()]
]
)
)
assert len(outputs) == 2 # MCP will be dropped
def test_pattern_dataset_schema_terms_transformation(mock_time):
dataset_mce = make_generic_dataset(
aspects=[
models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="address",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="first_name",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="last_name",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
]
)
transformer = PatternAddDatasetSchemaTerms.create(
{
"term_pattern": {
"rules": {
".*first_name.*": [
builder.make_term_urn("Name"),
builder.make_term_urn("FirstName"),
],
".*last_name.*": [
builder.make_term_urn("Name"),
builder.make_term_urn("LastName"),
],
}
},
},
PipelineContext(run_id="test-schema-terms"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 2
# Check that glossary terms were added.
schema_aspect = outputs[0].record.proposedSnapshot.aspects[0]
assert schema_aspect
assert schema_aspect.fields[0].fieldPath == "address"
assert schema_aspect.fields[0].glossaryTerms is None
assert schema_aspect.fields[1].fieldPath == "first_name"
assert schema_aspect.fields[1].glossaryTerms.terms[0].urn == builder.make_term_urn(
"Name"
)
assert schema_aspect.fields[1].glossaryTerms.terms[1].urn == builder.make_term_urn(
"FirstName"
)
assert schema_aspect.fields[2].fieldPath == "last_name"
assert schema_aspect.fields[2].glossaryTerms.terms[0].urn == builder.make_term_urn(
"Name"
)
assert schema_aspect.fields[2].glossaryTerms.terms[1].urn == builder.make_term_urn(
"LastName"
)
def test_pattern_dataset_schema_tags_transformation(mock_time):
dataset_mce = make_generic_dataset(
aspects=[
models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="address",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="first_name",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="last_name",
type=models.SchemaFieldDataTypeClass(
type=models.StringTypeClass()
),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
]
)
transformer = PatternAddDatasetSchemaTags.create(
{
"tag_pattern": {
"rules": {
".*first_name.*": [
builder.make_tag_urn("Name"),
builder.make_tag_urn("FirstName"),
],
".*last_name.*": [
builder.make_tag_urn("Name"),
builder.make_tag_urn("LastName"),
],
}
},
},
PipelineContext(run_id="test-schema-tags"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mce, EndOfStream()]
]
)
)
assert len(outputs) == 2
# Check that glossary terms were added.
schema_aspect = outputs[0].record.proposedSnapshot.aspects[0]
assert schema_aspect
assert schema_aspect.fields[0].fieldPath == "address"
assert schema_aspect.fields[0].globalTags is None
assert schema_aspect.fields[1].fieldPath == "first_name"
assert schema_aspect.fields[1].globalTags.tags[0].tag == builder.make_tag_urn(
"Name"
)
assert schema_aspect.fields[1].globalTags.tags[1].tag == builder.make_tag_urn(
"FirstName"
)
assert schema_aspect.fields[2].fieldPath == "last_name"
assert schema_aspect.fields[2].globalTags.tags[0].tag == builder.make_tag_urn(
"Name"
)
assert schema_aspect.fields[2].globalTags.tags[1].tag == builder.make_tag_urn(
"LastName"
)
def run_dataset_transformer_pipeline(
transformer_type: Type[Union[DatasetTransformer, TagTransformer]],
aspect: Optional[builder.Aspect],
config: dict,
pipeline_context: Optional[PipelineContext] = None,
use_mce: bool = False,
) -> List[RecordEnvelope]:
if pipeline_context is None:
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
transformer: DatasetTransformer = cast(
DatasetTransformer, transformer_type.create(config, pipeline_context)
)
dataset: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
if use_mce:
dataset = MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[],
)
)
else:
assert aspect
assert transformer.aspect_name() == aspect.ASPECT_NAME
dataset = make_generic_dataset_mcp(aspect=aspect)
outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [dataset, EndOfStream()]]
)
)
return outputs
def run_container_transformer_pipeline(
transformer_type: Type[ContainerTransformer],
aspect: Optional[builder.Aspect],
config: dict,
pipeline_context: Optional[PipelineContext] = None,
use_mce: bool = False,
) -> List[RecordEnvelope]:
if pipeline_context is None:
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
transformer: ContainerTransformer = cast(
ContainerTransformer, transformer_type.create(config, pipeline_context)
)
container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
if use_mce:
container = MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde",
aspects=[],
)
)
else:
assert aspect
container = make_generic_container_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)
outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]]
)
)
return outputs
def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph_instance):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
transformer = SimpleAddDatasetDomain.create({"domains": []}, pipeline_context)
assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME
def test_simple_add_dataset_domain(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={"domains": [acryl_domain]},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
def test_simple_add_dataset_domain_mce_support(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=None,
config={"domains": [datahub_domain, acryl_domain]},
pipeline_context=pipeline_context,
use_mce=True,
)
assert len(output) == 3
assert isinstance(output[0].record, MetadataChangeEventClass)
assert isinstance(output[0].record.proposedSnapshot, models.DatasetSnapshotClass)
assert len(output[0].record.proposedSnapshot.aspects) == 0
assert isinstance(output[1].record, MetadataChangeProposalWrapper)
assert output[1].record.aspect is not None
assert isinstance(output[1].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[1].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
def test_simple_add_dataset_domain_replace_existing(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={"replace_existing": True, "domains": [acryl_domain]},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert datahub_domain not in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
def test_simple_add_dataset_domain_semantics_overwrite(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"semantics": TransformerSemantics.OVERWRITE,
"domains": [acryl_domain],
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains
def test_simple_add_dataset_domain_semantics_patch(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domains": [acryl_domain],
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 3
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain in transformed_aspect.domains
def test_simple_add_dataset_domain_on_conflict_do_nothing(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domains": [acryl_domain],
"on_conflict": TransformerOnConflict.DO_NOTHING,
},
pipeline_context=pipeline_context,
)
assert len(output) == 1
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, EndOfStream)
def test_simple_add_dataset_domain_on_conflict_do_nothing_no_conflict(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
irrelevant_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domains": [acryl_domain],
"on_conflict": TransformerOnConflict.DO_NOTHING,
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert irrelevant_domain not in transformed_aspect.domains
def test_pattern_add_dataset_domain_aspect_name(mock_datahub_graph_instance):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
transformer = PatternAddDatasetDomain.create(
{"domain_pattern": {"rules": {}}}, pipeline_context
)
assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME
def test_pattern_add_dataset_domain_match(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
def test_pattern_add_dataset_domain_no_match(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*"
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert datahub_domain in transformed_aspect.domains
assert acryl_domain not in transformed_aspect.domains
def test_pattern_add_dataset_domain_replace_existing_match(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": True,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert datahub_domain not in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
def test_pattern_add_dataset_domain_replace_existing_no_match(
mock_datahub_graph_instance,
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*"
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": True,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 0
def test_pattern_add_dataset_domain_semantics_overwrite(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"semantics": TransformerSemantics.OVERWRITE,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains
def test_pattern_add_dataset_domain_semantics_patch(
pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance
):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
server_domain = builder.make_domain_urn("test.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 3
assert datahub_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain in transformed_aspect.domains
def test_simple_dataset_ownership_transformer_semantics_patch(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
server_owner: str = builder.make_owner_urn(
"mohd@acryl.io", owner_type=builder.OwnerType.USER
)
owner1: str = builder.make_owner_urn(
"john@acryl.io", owner_type=builder.OwnerType.USER
)
owner2: str = builder.make_owner_urn(
"pedro@acryl.io", owner_type=builder.OwnerType.USER
)
# Return fake aspect to simulate server behaviour
def fake_ownership_class(entity_urn: str) -> models.OwnershipClass:
return models.OwnershipClass(
owners=[
models.OwnerClass(
owner=server_owner, type=models.OwnershipTypeClass.DATAOWNER
)
]
)
pipeline_context.graph.get_ownership = fake_ownership_class # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetOwnership,
aspect=models.OwnershipClass(
owners=[
models.OwnerClass(owner=owner1, type=models.OwnershipTypeClass.PRODUCER)
]
),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"owner_urns": [owner2],
"ownership_type": "DATAOWNER",
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.OwnershipClass)
transformed_aspect: models.OwnershipClass = cast(
models.OwnershipClass, output[0].record.aspect
)
assert len(transformed_aspect.owners) == 3
owner_urns: List[str] = [
owner_class.owner for owner_class in transformed_aspect.owners
]
assert owner1 in owner_urns
assert owner2 in owner_urns
assert server_owner in owner_urns
def test_pattern_container_and_dataset_domain_transformation(
mock_datahub_graph_instance,
):
datahub_domain = builder.make_domain_urn("datahubproject.io")
acryl_domain = builder.make_domain_urn("acryl_domain")
server_domain = builder.make_domain_urn("server_domain")
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> models.BrowsePathsV2Class:
return models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
),
models.BrowsePathEntryClass(
id="container_2", urn="urn:li:container:container_2"
),
]
)
pipeline_context = PipelineContext(
run_id="test_pattern_container_and_dataset_domain_transformation"
)
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
with_domain_aspect = make_generic_dataset_mcp(
aspect=models.DomainsClass(domains=[datahub_domain])
)
no_domain_aspect = make_generic_dataset_mcp(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)"
)
# Not a dataset, should be ignored
not_a_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
)
inputs = [
with_domain_aspect,
no_domain_aspect,
not_a_dataset,
EndOfStream(),
]
# Initialize the transformer with container support for domains
transformer = PatternAddDatasetDomain.create(
{
"domain_pattern": {
"rules": {
".*example1.*": [acryl_domain, server_domain],
".*example2.*": [server_domain],
}
},
"is_container": True, # Enable container domain handling
},
pipeline_context,
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert (
len(outputs) == len(inputs) + 3
) # MCPs for the dataset without domains and the containers
first_domain_aspect = outputs[0].record.aspect
assert first_domain_aspect
assert len(first_domain_aspect.domains) == 3
assert all(
domain in first_domain_aspect.domains
for domain in [datahub_domain, acryl_domain, server_domain]
)
second_domain_aspect = outputs[3].record.aspect
assert second_domain_aspect
assert len(second_domain_aspect.domains) == 1
assert server_domain in second_domain_aspect.domains
# Verify that the third input (not a dataset) is unchanged
assert inputs[2] == outputs[2].record
# Verify conainer 1 and container 2 should contain all domains
container_1 = outputs[4].record.aspect
assert len(container_1.domains) == 2
assert acryl_domain in container_1.domains
assert server_domain in container_1.domains
container_2 = outputs[5].record.aspect
assert len(container_2.domains) == 2
assert acryl_domain in container_2.domains
assert server_domain in container_2.domains
def test_pattern_container_and_dataset_domain_transformation_with_no_container(
mock_datahub_graph_instance,
):
datahub_domain = builder.make_domain_urn("datahubproject.io")
acryl_domain = builder.make_domain_urn("acryl_domain")
server_domain = builder.make_domain_urn("server_domain")
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> Optional[models.BrowsePathsV2Class]:
return None
pipeline_context = PipelineContext(
run_id="test_pattern_container_and_dataset_domain_transformation_with_no_container"
)
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
with_domain_aspect = make_generic_dataset_mcp(
aspect=models.DomainsClass(domains=[datahub_domain])
)
no_domain_aspect = make_generic_dataset_mcp(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)"
)
inputs = [
with_domain_aspect,
no_domain_aspect,
EndOfStream(),
]
# Initialize the transformer with container support for domains
transformer = PatternAddDatasetDomain.create(
{
"domain_pattern": {
"rules": {
".*example1.*": [acryl_domain, server_domain],
".*example2.*": [server_domain],
}
},
"is_container": True, # Enable container domain handling
},
pipeline_context,
)
outputs = list(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)
assert len(outputs) == len(inputs) + 1
first_domain_aspect = outputs[0].record.aspect
assert first_domain_aspect
assert len(first_domain_aspect.domains) == 3
assert all(
domain in first_domain_aspect.domains
for domain in [datahub_domain, acryl_domain, server_domain]
)
second_domain_aspect = outputs[2].record.aspect
assert second_domain_aspect
assert len(second_domain_aspect.domains) == 1
assert server_domain in second_domain_aspect.domains
def test_pattern_add_container_dataset_domain_no_match(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
datahub_domain = builder.make_domain_urn("datahubproject.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*"
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph_instance
def fake_get_aspect(
entity_urn: str,
aspect_type: Type[models.BrowsePathsV2Class],
version: int = 0,
) -> models.BrowsePathsV2Class:
return models.BrowsePathsV2Class(
path=[
models.BrowsePathEntryClass(
id="container_1", urn="urn:li:container:container_1"
)
]
)
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[datahub_domain]),
config={
"replace_existing": True,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
"is_container": True,
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 0
def run_pattern_dataset_schema_terms_transformation_semantics(
semantics: TransformerSemantics,
mock_datahub_graph_instance: DataHubGraph,
) -> List[RecordEnvelope]:
pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms")
pipeline_context.graph = mock_datahub_graph_instance
# fake the server response
def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
return models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="first_name",
glossaryTerms=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="mobile_number",
glossaryTerms=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetSchemaTerms,
pipeline_context=pipeline_context,
config={
"semantics": semantics,
"term_pattern": {
"rules": {
".*first_name.*": [
builder.make_term_urn("Name"),
builder.make_term_urn("FirstName"),
],
".*last_name.*": [
builder.make_term_urn("Name"),
builder.make_term_urn("LastName"),
],
}
},
},
aspect=models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="address",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="first_name",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="last_name",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
),
)
return output
def test_pattern_dataset_schema_terms_transformation_patch(
mock_time, mock_datahub_graph_instance
):
output = run_pattern_dataset_schema_terms_transformation_semantics(
TransformerSemantics.PATCH, mock_datahub_graph_instance
)
assert len(output) == 2
# Check that glossary terms were added.
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.SchemaMetadataClass)
transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect)
field_path_vs_field: Dict[str, models.SchemaFieldClass] = {
field.fieldPath: field for field in transform_aspect.fields
}
assert (
field_path_vs_field.get("mobile_number") is not None
) # server field should be preserved during patch
assert field_path_vs_field["first_name"].glossaryTerms is not None
assert len(field_path_vs_field["first_name"].glossaryTerms.terms) == 3
glossary_terms_urn = [
term.urn for term in field_path_vs_field["first_name"].glossaryTerms.terms
]
assert builder.make_term_urn("pii") in glossary_terms_urn
assert builder.make_term_urn("FirstName") in glossary_terms_urn
assert builder.make_term_urn("Name") in glossary_terms_urn
def test_pattern_dataset_schema_terms_transformation_overwrite(
mock_time, mock_datahub_graph_instance
):
output = run_pattern_dataset_schema_terms_transformation_semantics(
TransformerSemantics.OVERWRITE, mock_datahub_graph_instance
)
assert len(output) == 2
# Check that glossary terms were added.
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.SchemaMetadataClass)
transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect)
field_path_vs_field: Dict[str, models.SchemaFieldClass] = {
field.fieldPath: field for field in transform_aspect.fields
}
assert (
field_path_vs_field.get("mobile_number") is None
) # server field should not be preserved during overwrite
assert field_path_vs_field["first_name"].glossaryTerms is not None
assert len(field_path_vs_field["first_name"].glossaryTerms.terms) == 2
glossary_terms_urn = [
term.urn for term in field_path_vs_field["first_name"].glossaryTerms.terms
]
assert builder.make_term_urn("pii") not in glossary_terms_urn
assert builder.make_term_urn("FirstName") in glossary_terms_urn
assert builder.make_term_urn("Name") in glossary_terms_urn
def run_pattern_dataset_schema_tags_transformation_semantics(
semantics: TransformerSemantics,
mock_datahub_graph_instance: DataHubGraph,
) -> List[RecordEnvelope]:
pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms")
pipeline_context.graph = mock_datahub_graph_instance
# fake the server response
def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
return models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="first_name",
globalTags=models.GlobalTagsClass(
tags=[
models.TagAssociationClass(tag=builder.make_tag_urn("pii"))
],
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="mobile_number",
globalTags=models.GlobalTagsClass(
tags=[
models.TagAssociationClass(tag=builder.make_tag_urn("pii"))
],
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetSchemaTags,
pipeline_context=pipeline_context,
config={
"semantics": semantics,
"tag_pattern": {
"rules": {
".*first_name.*": [
builder.make_tag_urn("Name"),
builder.make_tag_urn("FirstName"),
],
".*last_name.*": [
builder.make_tag_urn("Name"),
builder.make_tag_urn("LastName"),
],
}
},
},
aspect=models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="address",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="first_name",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="last_name",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
),
)
return output
def test_pattern_dataset_schema_tags_transformation_overwrite(
mock_time, mock_datahub_graph_instance
):
output = run_pattern_dataset_schema_tags_transformation_semantics(
TransformerSemantics.OVERWRITE, mock_datahub_graph_instance
)
assert len(output) == 2
# Check that glossary terms were added.
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.SchemaMetadataClass)
transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect)
field_path_vs_field: Dict[str, models.SchemaFieldClass] = {
field.fieldPath: field for field in transform_aspect.fields
}
assert (
field_path_vs_field.get("mobile_number") is None
) # server field should not be preserved during overwrite
assert field_path_vs_field["first_name"].globalTags is not None
assert len(field_path_vs_field["first_name"].globalTags.tags) == 2
global_tags_urn = [
tag.tag for tag in field_path_vs_field["first_name"].globalTags.tags
]
assert builder.make_tag_urn("pii") not in global_tags_urn
assert builder.make_tag_urn("FirstName") in global_tags_urn
assert builder.make_tag_urn("Name") in global_tags_urn
def test_pattern_dataset_schema_tags_transformation_patch(
mock_time, mock_datahub_graph_instance
):
output = run_pattern_dataset_schema_tags_transformation_semantics(
TransformerSemantics.PATCH, mock_datahub_graph_instance
)
assert len(output) == 2
# Check that global tags were added.
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.SchemaMetadataClass)
transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect)
field_path_vs_field: Dict[str, models.SchemaFieldClass] = {
field.fieldPath: field for field in transform_aspect.fields
}
assert (
field_path_vs_field.get("mobile_number") is not None
) # server field should be preserved during patch
assert field_path_vs_field["first_name"].globalTags is not None
assert len(field_path_vs_field["first_name"].globalTags.tags) == 3
global_tags_urn = [
tag.tag for tag in field_path_vs_field["first_name"].globalTags.tags
]
assert builder.make_tag_urn("pii") in global_tags_urn
assert builder.make_tag_urn("FirstName") in global_tags_urn
assert builder.make_tag_urn("Name") in global_tags_urn
def test_simple_dataset_data_product_transformation(mock_time):
transformer = SimpleAddDatasetDataProduct.create(
{
"dataset_to_data_product_urns": {
builder.make_dataset_urn(
"bigquery", "example1"
): "urn:li:dataProduct:first",
builder.make_dataset_urn(
"bigquery", "example2"
): "urn:li:dataProduct:second",
builder.make_dataset_urn(
"bigquery", "example3"
): "urn:li:dataProduct:first",
}
},
PipelineContext(run_id="test-dataproduct"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example1")
),
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example2")
),
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example3")
),
EndOfStream(),
]
]
)
)
assert len(outputs) == 6
# Check new dataproduct entity should be there
assert outputs[3].record.entityUrn == "urn:li:dataProduct:first"
assert outputs[3].record.aspectName == "dataProductProperties"
first_data_product_aspect = json.loads(
outputs[3].record.aspect.value.decode("utf-8")
)
assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
builder.make_dataset_urn("bigquery", "example1"),
builder.make_dataset_urn("bigquery", "example3"),
]
second_data_product_aspect = json.loads(
outputs[4].record.aspect.value.decode("utf-8")
)
assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [
builder.make_dataset_urn("bigquery", "example2")
]
assert isinstance(outputs[5].record, EndOfStream)
def test_pattern_dataset_data_product_transformation(mock_time):
transformer = PatternAddDatasetDataProduct.create(
{
"dataset_to_data_product_urns_pattern": {
"rules": {
".*example1.*": "urn:li:dataProduct:first",
".*": "urn:li:dataProduct:second",
}
},
},
PipelineContext(run_id="test-dataproducts"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example1")
),
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example2")
),
make_generic_dataset(
entity_urn=builder.make_dataset_urn("bigquery", "example3")
),
EndOfStream(),
]
]
)
)
assert len(outputs) == 6
# Check new dataproduct entity should be there
assert outputs[3].record.entityUrn == "urn:li:dataProduct:first"
assert outputs[3].record.aspectName == "dataProductProperties"
first_data_product_aspect = json.loads(
outputs[3].record.aspect.value.decode("utf-8")
)
assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
builder.make_dataset_urn("bigquery", "example1")
]
second_data_product_aspect = json.loads(
outputs[4].record.aspect.value.decode("utf-8")
)
assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [
builder.make_dataset_urn("bigquery", "example2"),
builder.make_dataset_urn("bigquery", "example3"),
]
assert isinstance(outputs[5].record, EndOfStream)
def dummy_data_product_resolver_method(dataset_urn):
dataset_to_data_product_map = {
builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first"
}
return dataset_to_data_product_map.get(dataset_urn)
def test_add_dataset_data_product_transformation():
transformer = AddDatasetDataProduct.create(
{
"get_data_product_to_add": "tests.unit.test_transform_dataset.dummy_data_product_resolver_method"
},
PipelineContext(run_id="test-dataproduct"),
)
outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [make_generic_dataset(), EndOfStream()]
]
)
)
# Check new dataproduct entity should be there
assert outputs[1].record.entityUrn == "urn:li:dataProduct:first"
assert outputs[1].record.aspectName == "dataProductProperties"
first_data_product_aspect = json.loads(
outputs[1].record.aspect.value.decode("utf-8")
)
assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
builder.make_dataset_urn("bigquery", "example1")
]
def _test_clean_owner_urns(
in_pipeline_context: Any,
in_owners: List[str],
config: List[Union[re.Pattern, str]],
cleaned_owner_urn: List[str],
) -> None:
# Return fake aspect to simulate server behaviour
def fake_ownership_class(entity_urn: str) -> models.OwnershipClass:
return models.OwnershipClass(
owners=[
models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER)
for owner in in_owners
]
)
in_pipeline_context.graph.get_ownership = fake_ownership_class # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanUpOwnership,
aspect=models.OwnershipClass(
owners=[
models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER)
for owner in in_owners
]
),
config={"pattern_for_cleanup": config},
pipeline_context=in_pipeline_context,
)
assert len(output) == 2
ownership_aspect = output[0].record.aspect
assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == len(in_owners)
out_owners = [owner.owner for owner in ownership_aspect.owners]
assert set(out_owners) == set(cleaned_owner_urn)
def test_clean_owner_urn_transformation_remove_fixed_string(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove 'ABCDEF:'
config: List[Union[re.Pattern, str]] = ["ABCDEF:"]
expected_user_emails: List[str] = [
"email_id@example.com",
"123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_multiple_values(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove multiple values
config: List[Union[re.Pattern, str]] = ["ABCDEF:", "email"]
expected_user_emails: List[str] = [
"_id@example.com",
"123_id@example.com",
"_id@example.co.in",
"_id@example.co.uk",
"_test:XYZ@example.com",
"_id:id1@example.com",
"_id:id2@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_values_using_regex(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove words after `_` using RegEx i.e. `id`, `test`
config: List[Union[re.Pattern, str]] = [r"(?<=_)(\w+)"]
expected_user_emails: List[str] = [
"ABCDEF:email_@example.com",
"ABCDEF:123email_@example.com",
"email_@example.co.in",
"email_@example.co.uk",
"email_:XYZ@example.com",
"email_:id1@example.com",
"email_:id2@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_digits(mock_datahub_graph_instance):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove digits
config: List[Union[re.Pattern, str]] = [r"\d+"]
expected_user_emails: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id@example.com",
"email_id:id@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_pattern(mock_datahub_graph_instance):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove `example.*`
config: List[Union[re.Pattern, str]] = [r"@example\.\S*"]
expected_user_emails: List[str] = [
"ABCDEF:email_id",
"ABCDEF:123email_id",
"email_id",
"email_id",
"email_test:XYZ",
"email_id:id1",
"email_id:id2",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_word_in_capital_letters(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
"email_test:XYabZ@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# if string between `:` and `@` is in CAPITAL then remove it
config: List[Union[re.Pattern, str]] = ["(?<=:)[A-Z]+(?=@)"]
expected_user_emails: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
"email_test:XYabZ@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_remove_pattern_with_alphanumeric_value(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# remove any pattern having `id` followed by any digits
config: List[Union[re.Pattern, str]] = [r"id\d+"]
expected_user_emails: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:@example.com",
"email_id:@example.com",
]
expected_owner_urns: List[str] = []
for user in expected_user_emails:
expected_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_urn_transformation_should_not_remove_system_identifier(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
user_emails = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for user in user_emails:
in_owner_urns.append(
builder.make_owner_urn(user, owner_type=builder.OwnerType.USER)
)
# should not remove system identifier
config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"]
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns)
def test_clean_owner_group_urn_transformation_remove_fixed_string(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove 'ABCDEF:'
config: List[Union[re.Pattern, str]] = ["ABCDEF:"]
expected_group_ids: List[str] = [
"email_id@example.com",
"123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_multiple_values(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove multiple values
config: List[Union[re.Pattern, str]] = ["ABCDEF:", "email"]
expected_group_ids: List[str] = [
"_id@example.com",
"123_id@example.com",
"_id@example.co.in",
"_id@example.co.uk",
"_test:XYZ@example.com",
"_id:id1@example.com",
"_id:id2@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_values_using_regex(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove words after `_` using RegEx i.e. `id`, `test`
config: List[Union[re.Pattern, str]] = [r"(?<=_)(\w+)"]
expected_group_ids: List[str] = [
"ABCDEF:email_@example.com",
"ABCDEF:123email_@example.com",
"email_@example.co.in",
"email_@example.co.uk",
"email_:XYZ@example.com",
"email_:id1@example.com",
"email_:id2@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_digits(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove digits
config: List[Union[re.Pattern, str]] = [r"\d+"]
expected_group_ids: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id@example.com",
"email_id:id@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_pattern(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove `example.*`
config: List[Union[re.Pattern, str]] = [r"@example\.\S*"]
expected_group_ids: List[str] = [
"ABCDEF:email_id",
"ABCDEF:123email_id",
"email_id",
"email_id",
"email_test:XYZ",
"email_id:id1",
"email_id:id2",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_word_in_capital_letters(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
"email_test:XYabZ@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# if string between `:` and `@` is in CAPITAL then remove it
config: List[Union[re.Pattern, str]] = ["(?<=:)[A-Z]+(?=@)"]
expected_group_ids: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
"email_test:XYabZ@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_remove_pattern_with_alphanumeric_value(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# remove any pattern having `id` followed by any digits
config: List[Union[re.Pattern, str]] = [r"id\d+"]
expected_group_ids: List[str] = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:@example.com",
"email_id:@example.com",
]
expected_owner_urns: List[str] = []
for group in expected_group_ids:
expected_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns)
def test_clean_owner_group_urn_transformation_should_not_remove_system_identifier(
mock_datahub_graph_instance,
):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
group_ids = [
"ABCDEF:email_id@example.com",
"ABCDEF:123email_id@example.com",
"email_id@example.co.in",
"email_id@example.co.uk",
"email_test:XYZ@example.com",
"email_id:id1@example.com",
"email_id:id2@example.com",
]
in_owner_urns: List[str] = []
for group in group_ids:
in_owner_urns.append(
builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP)
)
# should not remove system identifier
config: List[Union[re.Pattern, str]] = ["urn:li:corpGroup:"]
_test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns)
def test_replace_external_url_word_replace(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)
def test_replace_external_regex_replace_1(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)
def test_replace_external_regex_replace_2(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)
def test_pattern_cleanup_usage_statistics_user_1(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph_instance
TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)
output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("IAM:user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["IAM:"]},
pipeline_context=pipeline_context,
)
expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts
def test_pattern_cleanup_usage_statistics_user_2(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph_instance
TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)
output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["_user"]},
pipeline_context=pipeline_context,
)
expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_2"),
count=2,
userEmail="user2@exaple.com",
),
],
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts
def test_pattern_cleanup_usage_statistics_user_3(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph_instance
TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)
output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": [r"_user_\d+"]},
pipeline_context=pipeline_context,
)
expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz"),
count=2,
userEmail="user2@exaple.com",
),
],
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts
def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
tag_one = builder.make_tag_urn("test:tag_1")
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)])
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={"domain_mapping": {"test:tag_1": acryl_domain}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains
def test_domain_mapping_based_on_tags_with_no_matching_tags(
mock_datahub_graph_instance,
):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
non_matching_tag = builder.make_tag_urn("nonMatching")
pipeline_context = PipelineContext(run_id="no_match_pipeline")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)])
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={
"domain_mapping": {"test:tag_1": acryl_domain},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain not in transformed_aspect.domains
assert server_domain in transformed_aspect.domains
def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph_instance):
some_tag = builder.make_tag_urn("someTag")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)])
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[]),
config={"domain_mapping": {}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 0
def test_domain_mapping_based__r_on_tags_with_multiple_tags(
mock_datahub_graph_instance,
):
# Two tags that match different rules in the domain mapping configuration
tag_one = builder.make_tag_urn("test:tag_1")
tag_two = builder.make_tag_urn("test:tag_2")
existing_domain = builder.make_domain_urn("existing.io")
finance = builder.make_domain_urn("finance")
hr = builder.make_domain_urn("hr")
pipeline_context = PipelineContext(run_id="multiple_matches_pipeline")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)]
)
# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[existing_domain])
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[existing_domain]),
config={
"domain_mapping": {"test:tag_1": finance, "test:tag_2": hr},
"semantics": "PATCH",
},
pipeline_context=pipeline_context,
)
# Assertions to verify the expected outcome
assert len(output) == 2
assert output[0].record is not None
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
# Expecting domains from both matched tags
assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr}
assert len(transformed_aspect.domains) == 3
def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[])
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains
def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph_instance):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph_instance
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
return None
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains
def test_tags_to_terms_transformation(mock_datahub_graph_instance):
# Create domain URNs for the test
term_urn_example1 = builder.make_term_urn("example1")
term_urn_example2 = builder.make_term_urn("example2")
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[
TagAssociationClass(tag=builder.make_tag_urn("example1")),
TagAssociationClass(tag=builder.make_tag_urn("example2")),
]
)
# fake the server response
def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
return models.SchemaMetadataClass(
schemaName="customer", # not used
platform=builder.make_data_platform_urn(
"hive"
), # important <- platform must be an urn
version=0,
# when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="",
# when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="first_name",
globalTags=models.GlobalTagsClass(
tags=[
models.TagAssociationClass(
tag=builder.make_tag_urn("example2")
)
],
),
glossaryTerms=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="mobile_number",
glossaryTerms=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore
# Configuring the transformer
config = {"tags": ["example1", "example2"]}
# Running the transformer within a test pipeline
output = run_dataset_transformer_pipeline(
transformer_type=TagsToTermMapper,
aspect=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii"))
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
config=config,
pipeline_context=pipeline_context,
)
# Expected results
expected_terms = [term_urn_example2, term_urn_example1]
# Verify the output
assert len(output) == 2 # One for result and one for end of stream
terms_aspect = output[0].record.aspect
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == len(expected_terms)
assert set(term.urn for term in terms_aspect.terms) == {
"urn:li:glossaryTerm:example1",
"urn:li:glossaryTerm:example2",
}
def test_tags_to_terms_with_no_matching_terms(mock_datahub_graph_instance):
# Setup for test where no tags match the provided term mappings
def fake_get_tags_no_match(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[
TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag1")),
TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag2")),
]
)
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_tags = fake_get_tags_no_match # type: ignore
# No matching terms in config
config = {"tags": ["example1", "example2"]}
# Running the transformer within a test pipeline
output = run_dataset_transformer_pipeline(
transformer_type=TagsToTermMapper,
aspect=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii"))
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
config=config,
pipeline_context=pipeline_context,
)
# Verify the output
assert len(output) == 2 # One for result and one for end of stream
terms_aspect = output[0].record.aspect
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == 1
def test_tags_to_terms_with_missing_tags(mock_datahub_graph_instance):
# Setup for test where no tags are present
def fake_get_no_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[])
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_tags = fake_get_no_tags # type: ignore
config = {"tags": ["example1", "example2"]}
# Running the transformer with no tags
output = run_dataset_transformer_pipeline(
transformer_type=TagsToTermMapper,
aspect=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii"))
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
config=config,
pipeline_context=pipeline_context,
)
# Verify that no terms are added when there are no tags
assert len(output) == 2
terms_aspect = output[0].record.aspect
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == 1
def test_tags_to_terms_with_partial_match(mock_datahub_graph_instance):
# Setup for partial match scenario
def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[
TagAssociationClass(
tag=builder.make_tag_urn("example1")
), # Should match
TagAssociationClass(
tag=builder.make_tag_urn("nonMatchingTag")
), # No match
]
)
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph_instance
pipeline_context.graph.get_tags = fake_get_partial_match_tags # type: ignore
config = {"tags": ["example1"]} # Only 'example1' has a term mapped
# Running the transformer with partial matching tags
output = run_dataset_transformer_pipeline(
transformer_type=TagsToTermMapper,
aspect=models.GlossaryTermsClass(
terms=[
models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii"))
],
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
config=config,
pipeline_context=pipeline_context,
)
# Verify that only matched term is added
assert len(output) == 2
terms_aspect = output[0].record.aspect
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == 1
assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1"
def test_replace_external_url_container_word_replace(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)
def test_replace_external_regex_container_replace_1(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)
def test_replace_external_regex_container_replace_2(
mock_datahub_graph_instance,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph_instance
output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)