import json import re from datetime import datetime, timezone from typing import Any, Dict, List, MutableSequence, Optional, Type, Union, cast from unittest import mock from uuid import uuid4 import pytest import datahub.emitter.mce_builder as builder import datahub.metadata.schema_classes as models import tests.test_helpers.mce_helpers from datahub.configuration.common import TransformerSemantics from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api import workunit from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.transformer.add_dataset_browse_path import ( AddDatasetBrowsePathTransformer, ) from datahub.ingestion.transformer.add_dataset_dataproduct import ( AddDatasetDataProduct, PatternAddDatasetDataProduct, SimpleAddDatasetDataProduct, ) from datahub.ingestion.transformer.add_dataset_ownership import ( AddDatasetOwnership, PatternAddDatasetOwnership, SimpleAddDatasetOwnership, ) from datahub.ingestion.transformer.add_dataset_properties import ( AddDatasetProperties, AddDatasetPropertiesResolverBase, SimpleAddDatasetProperties, ) from datahub.ingestion.transformer.add_dataset_schema_tags import ( PatternAddDatasetSchemaTags, ) from datahub.ingestion.transformer.add_dataset_schema_terms import ( PatternAddDatasetSchemaTerms, ) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, PatternAddDatasetTags, SimpleAddDatasetTags, ) from datahub.ingestion.transformer.add_dataset_terms import ( PatternAddDatasetTerms, SimpleAddDatasetTerms, ) from datahub.ingestion.transformer.base_transformer import ( BaseTransformer, SingleAspectTransformer, ) from datahub.ingestion.transformer.dataset_domain import ( PatternAddDatasetDomain, SimpleAddDatasetDomain, TransformerOnConflict, ) from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( DatasetTagDomainMapper, ) from datahub.ingestion.transformer.dataset_transformer import ( ContainerTransformer, DatasetTransformer, TagTransformer, ) from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags from datahub.ingestion.transformer.extract_ownership_from_tags import ( ExtractOwnersFromTagsTransformer, ) from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus from datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user import ( PatternCleanupDatasetUsageUser, ) from datahub.ingestion.transformer.pattern_cleanup_ownership import ( PatternCleanUpOwnership, ) from datahub.ingestion.transformer.remove_dataset_ownership import ( SimpleRemoveDatasetOwnership, ) from datahub.ingestion.transformer.replace_external_url import ( ReplaceExternalUrlContainer, ReplaceExternalUrlDataset, ) from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper from datahub.metadata.schema_classes import ( BrowsePathsClass, DatasetPropertiesClass, DatasetUserUsageCountsClass, GlobalTagsClass, MetadataChangeEventClass, OwnershipClass, OwnershipTypeClass, StatusClass, TagAssociationClass, ) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn def make_generic_dataset( entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects: Optional[List[Any]] = None, ) -> models.MetadataChangeEventClass: if aspects is None: # Default to a status aspect if none is provided. aspects = [models.StatusClass(removed=False)] return models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn=entity_urn, aspects=aspects, ), ) def make_generic_dataset_mcp( entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspect: Any = models.StatusClass(removed=False), ) -> MetadataChangeProposalWrapper: return MetadataChangeProposalWrapper( entityUrn=entity_urn, aspect=aspect, ) def make_generic_container_mcp( entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee", aspect_name: str = "status", aspect: Any = None, ) -> MetadataChangeProposalWrapper: if aspect is None: aspect = models.StatusClass(removed=False) return MetadataChangeProposalWrapper( entityUrn=entity_urn, entityType=Urn.from_string(entity_urn).entity_type, aspectName=aspect_name, changeType="UPSERT", aspect=aspect, ) def create_and_run_test_pipeline( events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]], transformers: List[Dict[str, Any]], path: str, ) -> str: with mock.patch( "tests.unit.test_source.FakeSource.get_workunits" ) as mock_getworkunits: mock_getworkunits.return_value = [ ( workunit.MetadataWorkUnit( id=f"test-workunit-mce-{e.proposedSnapshot.urn}", mce=e ) if isinstance(e, MetadataChangeEventClass) else workunit.MetadataWorkUnit( id=f"test-workunit-mcp-{e.entityUrn}-{e.aspectName}", mcp=e ) ) for e in events ] events_file = f"{path}/{str(uuid4())}.json" pipeline = Pipeline.create( config_dict={ "source": { "type": "tests.unit.test_source.FakeSource", "config": {}, }, "transformers": transformers, "sink": {"type": "file", "config": {"filename": events_file}}, } ) pipeline.run() pipeline.raise_from_status() return events_file def make_dataset_with_owner() -> models.MetadataChangeEventClass: return models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ models.OwnershipClass( owners=[ models.OwnerClass( owner=builder.make_user_urn("fake_owner"), type=models.OwnershipTypeClass.DATAOWNER, ), ], lastModified=models.AuditStampClass( time=1625266033123, actor="urn:li:corpuser:datahub" ), ) ], ), ) EXISTING_PROPERTIES = {"my_existing_property": "existing property value"} def make_dataset_with_properties() -> models.MetadataChangeEventClass: return models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), models.DatasetPropertiesClass( customProperties=EXISTING_PROPERTIES.copy() ), ], ), ) def test_dataset_ownership_transformation(mock_time): no_owner_aspect = make_generic_dataset() with_owner_aspect = make_dataset_with_owner() not_a_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ models.DataJobInfoClass( name="User Deletions", description="Constructs the fct_users_deleted from logging_events", type=models.AzkabanJobTypeClass.SQL, ) ], ) ) inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] transformer = SimpleAddDatasetOwnership.create( { "owner_urns": [ builder.make_user_urn("person1"), builder.make_user_urn("person2"), ] }, PipelineContext(run_id="test"), ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert len(outputs) == len(inputs) + 2 # Check the first entry. first_ownership_aspect = builder.get_aspect_if_available( outputs[0].record, models.OwnershipClass ) assert first_ownership_aspect is None last_event = outputs[3].record assert isinstance(last_event, MetadataChangeProposalWrapper) assert isinstance(last_event.aspect, OwnershipClass) assert len(last_event.aspect.owners) == 2 assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None for owner in last_event.aspect.owners ] ) # Check the second entry. second_ownership_aspect = builder.get_aspect_if_available( outputs[1].record, models.OwnershipClass ) assert second_ownership_aspect assert len(second_ownership_aspect.owners) == 3 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None for owner in second_ownership_aspect.owners ] ) third_ownership_aspect = outputs[4].record.aspect assert third_ownership_aspect assert len(third_ownership_aspect.owners) == 2 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None for owner in second_ownership_aspect.owners ] ) # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record # Verify that the last entry is EndOfStream assert inputs[-1] == outputs[-1].record def test_simple_dataset_ownership_with_type_transformation(mock_time): input = make_generic_dataset() transformer = SimpleAddDatasetOwnership.create( { "owner_urns": [ builder.make_user_urn("person1"), ], "ownership_type": "PRODUCER", }, PipelineContext(run_id="test"), ) output = list( transformer.transform( [ RecordEnvelope(input, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(output) == 3 # original MCE is unchanged assert input == output[0].record ownership_aspect = output[1].record.aspect assert isinstance(ownership_aspect, OwnershipClass) assert len(ownership_aspect.owners) == 1 assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER def test_simple_dataset_ownership_with_type_urn_transformation(mock_time): input = make_generic_dataset() transformer = SimpleAddDatasetOwnership.create( { "owner_urns": [ builder.make_user_urn("person1"), ], "ownership_type": "urn:li:ownershipType:__system__technical_owner", }, PipelineContext(run_id="test"), ) output = list( transformer.transform( [ RecordEnvelope(input, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(output) == 3 # original MCE is unchanged assert input == output[0].record ownership_aspect = output[1].record.aspect assert isinstance(ownership_aspect, OwnershipClass) assert len(ownership_aspect.owners) == 1 assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM assert ( ownership_aspect.owners[0].typeUrn == "urn:li:ownershipType:__system__technical_owner" ) def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None: input = make_generic_dataset(entity_urn=in_urn) transformer = ExtractDatasetTags.create( { "extract_tags_from": "urn", "extract_tags_regex": regex_str, "semantics": "overwrite", }, PipelineContext(run_id="test"), ) output = list( transformer.transform( [ RecordEnvelope(input, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(output) == 3 assert output[0].record == input tags_aspect = output[1].record.aspect assert isinstance(tags_aspect, GlobalTagsClass) assert len(tags_aspect.tags) == 1 assert tags_aspect.tags[0].tag == out_tag def test_extract_dataset_tags(mock_time): _test_extract_tags( in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.part1-part2-part3_part4,PROD)", regex_str="(.*)", out_tag="urn:li:tag:clusterid.part1-part2-part3_part4", ) _test_extract_tags( in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.USA-ops-team_table1,PROD)", regex_str=".([^._]*)_", out_tag="urn:li:tag:USA-ops-team", ) _test_extract_tags( in_urn="urn:li:dataset:(urn:li:dataPlatform:kafka,clusterid.Canada-marketing_table1,PROD)", regex_str=".([^._]*)_", out_tag="urn:li:tag:Canada-marketing", ) _test_extract_tags( in_urn="urn:li:dataset:(urn:li:dataPlatform:elasticsearch,abcdef-prefix_datahub_usage_event-000027,PROD)", regex_str="([^._]*)_", out_tag="urn:li:tag:abcdef-prefix", ) def test_simple_dataset_ownership_with_invalid_type_transformation(mock_time): with pytest.raises(ValueError): SimpleAddDatasetOwnership.create( { "owner_urns": [ builder.make_user_urn("person1"), ], "ownership_type": "INVALID_TYPE", }, PipelineContext(run_id="test"), ) def test_simple_remove_dataset_ownership(): with_owner_aspect = make_dataset_with_owner() transformer = SimpleRemoveDatasetOwnership.create( {}, PipelineContext(run_id="test"), ) outputs = list( transformer.transform([RecordEnvelope(with_owner_aspect, metadata={})]) ) ownership_aspect = builder.get_aspect_if_available( outputs[0].record, models.OwnershipClass ) assert ownership_aspect assert len(ownership_aspect.owners) == 0 def test_mark_status_dataset(tmp_path): dataset = make_generic_dataset() transformer = MarkDatasetStatus.create( {"removed": True}, PipelineContext(run_id="test"), ) removed = list( transformer.transform( [ RecordEnvelope(dataset, metadata={}), ] ) ) assert len(removed) == 1 status_aspect = builder.get_aspect_if_available( removed[0].record, models.StatusClass ) assert status_aspect assert status_aspect.removed is True transformer = MarkDatasetStatus.create( {"removed": False}, PipelineContext(run_id="test"), ) not_removed = list( transformer.transform( [ RecordEnvelope(dataset, metadata={}), ] ) ) assert len(not_removed) == 1 status_aspect = builder.get_aspect_if_available( not_removed[0].record, models.StatusClass ) assert status_aspect assert status_aspect.removed is False mcp = make_generic_dataset_mcp( aspect=DatasetPropertiesClass(description="Test dataset"), ) events_file = create_and_run_test_pipeline( events=[mcp], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, ) # assert dataset properties aspect was preserved assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="datasetProperties", aspect_field_matcher={"description": "Test dataset"}, file=events_file, ) == 1 ) # assert Status aspect was generated assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="status", aspect_field_matcher={"removed": True}, file=events_file, ) == 1 ) # MCE only test_aspect = DatasetPropertiesClass(description="Test dataset") events_file = create_and_run_test_pipeline( events=[make_generic_dataset(aspects=[test_aspect])], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, ) # assert dataset properties aspect was preserved assert ( tests.test_helpers.mce_helpers.assert_entity_mce_aspect( entity_urn=mcp.entityUrn or "", aspect=test_aspect, aspect_type=DatasetPropertiesClass, file=events_file, ) == 1 ) # assert Status aspect was generated assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="status", aspect_field_matcher={"removed": True}, file=events_file, ) == 1 ) # MCE (non-matching) + MCP (matching) test_aspect = DatasetPropertiesClass(description="Test dataset") events_file = create_and_run_test_pipeline( events=[ make_generic_dataset(aspects=[test_aspect]), make_generic_dataset_mcp(), ], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, ) # assert dataset properties aspect was preserved assert ( tests.test_helpers.mce_helpers.assert_entity_mce_aspect( entity_urn=mcp.entityUrn or "", aspect=test_aspect, aspect_type=DatasetPropertiesClass, file=events_file, ) == 1 ) # assert Status aspect was generated assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="status", aspect_field_matcher={"removed": True}, file=events_file, ) == 1 ) # MCE (matching) + MCP (non-matching) test_status_aspect = StatusClass(removed=False) events_file = create_and_run_test_pipeline( events=[ make_generic_dataset(aspects=[test_status_aspect]), make_generic_dataset_mcp( aspect=DatasetPropertiesClass(description="test dataset"), ), ], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, ) # assert MCE was transformed assert ( tests.test_helpers.mce_helpers.assert_entity_mce_aspect( entity_urn=mcp.entityUrn or "", aspect=StatusClass(removed=True), aspect_type=StatusClass, file=events_file, ) == 1 ) # assert MCP aspect was preserved assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="datasetProperties", aspect_field_matcher={"description": "test dataset"}, file=events_file, ) == 1 ) # MCE (non-matching) + MCP (non-matching) test_mcp_aspect = GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:test")]) test_dataset_props_aspect = DatasetPropertiesClass(description="Test dataset") events_file = create_and_run_test_pipeline( events=[ make_generic_dataset(aspects=[test_dataset_props_aspect]), make_generic_dataset_mcp(aspect=test_mcp_aspect), ], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, ) # assert MCE was preserved assert ( tests.test_helpers.mce_helpers.assert_entity_mce_aspect( entity_urn=mcp.entityUrn or "", aspect=test_dataset_props_aspect, aspect_type=DatasetPropertiesClass, file=events_file, ) == 1 ) # assert MCP aspect was preserved assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="globalTags", aspect_field_matcher={"tags": [{"tag": "urn:li:tag:test"}]}, file=events_file, ) == 1 ) # assert MCP Status aspect was generated assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="status", aspect_field_matcher={"removed": True}, file=events_file, ) == 1 ) def test_extract_owners_from_tags(): def _test_owner( tag: str, config: Dict, expected_owner: str, expected_owner_type: Optional[str] = None, expected_owner_type_urn: Optional[str] = None, ) -> None: dataset = make_generic_dataset( aspects=[ models.GlobalTagsClass( tags=[TagAssociationClass(tag=builder.make_tag_urn(tag))] ) ] ) transformer = ExtractOwnersFromTagsTransformer.create( config, PipelineContext(run_id="test"), ) record_envelops: List[RecordEnvelope] = list( transformer.transform( [ RecordEnvelope(dataset, metadata={}), RecordEnvelope(record=EndOfStream(), metadata={}), ] ) ) assert len(record_envelops) == 3 mcp: MetadataChangeProposalWrapper = record_envelops[1].record owners_aspect = cast(OwnershipClass, mcp.aspect) owners = owners_aspect.owners owner = owners[0] assert expected_owner_type is not None assert owner.type == expected_owner_type assert owner.owner == expected_owner assert owner.typeUrn == expected_owner_type_urn _test_owner( tag="owner:foo", config={ "tag_prefix": "owner:", }, expected_owner="urn:li:corpuser:foo", expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER, ) _test_owner( tag="abcdef-owner:foo", config={ "tag_prefix": ".*owner:", }, expected_owner="urn:li:corpuser:foo", expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER, ) _test_owner( tag="owner:foo", config={ "tag_prefix": "owner:", "is_user": False, }, expected_owner="urn:li:corpGroup:foo", expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER, ) _test_owner( tag="owner:foo", config={ "tag_prefix": "owner:", "email_domain": "example.com", }, expected_owner="urn:li:corpuser:foo@example.com", expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER, ) _test_owner( tag="owner:foo", config={ "tag_prefix": "owner:", "email_domain": "example.com", "owner_type": "TECHNICAL_OWNER", }, expected_owner="urn:li:corpuser:foo@example.com", expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER, ) _test_owner( tag="owner:foo", config={ "tag_prefix": "owner:", "email_domain": "example.com", "owner_type": "AUTHOR", "owner_type_urn": "urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", }, expected_owner="urn:li:corpuser:foo@example.com", expected_owner_type=OwnershipTypeClass.CUSTOM, expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", ) _test_owner( tag="data__producer__owner__email:abc--xyz-email_com", config={ "tag_pattern": "(.*)_owner_email:", "tag_character_mapping": { "_": ".", "-": "@", "__": "_", "--": "-", }, "extract_owner_type_from_tag_pattern": True, }, expected_owner="urn:li:corpuser:abc-xyz@email.com", expected_owner_type=OwnershipTypeClass.CUSTOM, expected_owner_type_urn="urn:li:ownershipType:data_producer", ) def test_add_dataset_browse_paths(): dataset = make_generic_dataset() transformer = AddDatasetBrowsePathTransformer.create( {"path_templates": ["/abc"]}, PipelineContext(run_id="test"), ) transformed = list( transformer.transform( [ RecordEnvelope(dataset, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) browse_path_aspect = transformed[1].record.aspect assert browse_path_aspect assert browse_path_aspect.paths == ["/abc"] # use an mce with a pre-existing browse path dataset_mce = make_generic_dataset( aspects=[StatusClass(removed=False), browse_path_aspect] ) transformer = AddDatasetBrowsePathTransformer.create( { "path_templates": [ "/PLATFORM/foo/DATASET_PARTS/ENV", "/ENV/PLATFORM/bar/DATASET_PARTS/", ] }, PipelineContext(run_id="test"), ) transformed = list( transformer.transform( [ RecordEnvelope(dataset_mce, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(transformed) == 2 browse_path_aspect = builder.get_aspect_if_available( transformed[0].record, BrowsePathsClass ) assert browse_path_aspect assert browse_path_aspect.paths == [ "/abc", "/bigquery/foo/example1/prod", "/prod/bigquery/bar/example1/", ] transformer = AddDatasetBrowsePathTransformer.create( { "path_templates": [ "/xyz", ], "replace_existing": True, }, PipelineContext(run_id="test"), ) transformed = list( transformer.transform( [ RecordEnvelope(dataset_mce, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(transformed) == 2 browse_path_aspect = builder.get_aspect_if_available( transformed[0].record, BrowsePathsClass ) assert browse_path_aspect assert browse_path_aspect.paths == [ "/xyz", ] def test_simple_dataset_tags_transformation(mock_time): dataset_mce = make_generic_dataset() transformer = SimpleAddDatasetTags.create( { "tag_urns": [ builder.make_tag_urn("NeedsDocumentation"), builder.make_tag_urn("Legacy"), ] }, PipelineContext(run_id="test-tags"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 5 # Check that tags were added. tags_aspect = outputs[1].record.aspect assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") assert tags_aspect assert len(tags_aspect.tags) == 2 # Check new tag entity should be there assert outputs[2].record.aspectName == "tagKey" assert outputs[2].record.aspect.name == "NeedsDocumentation" assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") assert outputs[3].record.aspectName == "tagKey" assert outputs[3].record.aspect.name == "Legacy" assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy") assert isinstance(outputs[4].record, EndOfStream) def dummy_tag_resolver_method(dataset_snapshot): return [] def test_pattern_dataset_tags_transformation(mock_time): dataset_mce = make_generic_dataset() transformer = PatternAddDatasetTags.create( { "tag_pattern": { "rules": { ".*example1.*": [ builder.make_tag_urn("Private"), builder.make_tag_urn("Legacy"), ], ".*example2.*": [builder.make_term_urn("Needs Documentation")], } }, }, PipelineContext(run_id="test-tags"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 5 tags_aspect = outputs[1].record.aspect assert tags_aspect assert len(tags_aspect.tags) == 2 assert tags_aspect.tags[0].tag == builder.make_tag_urn("Private") assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags def test_add_dataset_tags_transformation(): transformer = AddDatasetTags.create( { "get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method" }, PipelineContext(run_id="test-tags"), ) output = list( transformer.transform( [RecordEnvelope(input, metadata={}) for input in [make_generic_dataset()]] ) ) assert output def test_pattern_dataset_ownership_transformation(mock_time): no_owner_aspect = make_generic_dataset() with_owner_aspect = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ models.OwnershipClass( owners=[ models.OwnerClass( owner=builder.make_user_urn("fake_owner"), type=models.OwnershipTypeClass.DATAOWNER, ), ], lastModified=models.AuditStampClass( time=1625266033123, actor="urn:li:corpuser:datahub" ), ) ], ), ) not_a_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ models.DataJobInfoClass( name="User Deletions", description="Constructs the fct_users_deleted from logging_events", type=models.AzkabanJobTypeClass.SQL, ) ], ) ) inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] transformer = PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], ".*dag_abc.*": [builder.make_user_urn("person2")], } }, "ownership_type": "DATAOWNER", }, PipelineContext(run_id="test"), ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert ( len(outputs) == len(inputs) + 2 ) # additional MCP due to the no-owner MCE + datajob # Check the first entry. assert inputs[0] == outputs[0].record first_ownership_aspect = outputs[3].record.aspect assert first_ownership_aspect assert len(first_ownership_aspect.owners) == 1 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in first_ownership_aspect.owners ] ) # Check the second entry. second_ownership_aspect = builder.get_aspect_if_available( outputs[1].record, models.OwnershipClass ) assert second_ownership_aspect assert len(second_ownership_aspect.owners) == 2 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in second_ownership_aspect.owners ] ) third_ownership_aspect = outputs[4].record.aspect assert third_ownership_aspect assert len(third_ownership_aspect.owners) == 1 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in third_ownership_aspect.owners ] ) # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record # Verify that the last entry is unchanged (EOS) assert inputs[-1] == outputs[-1].record def test_pattern_dataset_ownership_with_type_transformation(mock_time): input = make_generic_dataset() transformer = PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example1.*": [builder.make_user_urn("person1")], } }, "ownership_type": "PRODUCER", }, PipelineContext(run_id="test"), ) output = list( transformer.transform( [ RecordEnvelope(input, metadata={}), RecordEnvelope(EndOfStream(), metadata={}), ] ) ) assert len(output) == 3 ownership_aspect = output[1].record.aspect assert ownership_aspect assert len(ownership_aspect.owners) == 1 assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time): with pytest.raises(ValueError): PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example1.*": [builder.make_user_urn("person1")], } }, "ownership_type": "INVALID_TYPE", }, PipelineContext(run_id="test"), ) def test_pattern_container_and_dataset_ownership_transformation( mock_time, mock_datahub_graph_instance ): def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> Optional[models.BrowsePathsV2Class]: return models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ), models.BrowsePathEntryClass( id="container_2", urn="urn:li:container:container_2" ), ] ) pipeline_context = PipelineContext( run_id="test_pattern_container_and_dataset_ownership_transformation" ) pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore # No owner aspect for the first dataset no_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[models.StatusClass(removed=False)], ), ) # Dataset with an existing owner with_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ models.OwnershipClass( owners=[ models.OwnerClass( owner=builder.make_user_urn("fake_owner"), type=models.OwnershipTypeClass.DATAOWNER, ), ], lastModified=models.AuditStampClass( time=1625266033123, actor="urn:li:corpuser:datahub" ), ) ], ), ) datajob = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ models.DataJobInfoClass( name="User Deletions", description="Constructs the fct_users_deleted from logging_events", type=models.AzkabanJobTypeClass.SQL, ) ], ) ) inputs = [ no_owner_aspect_dataset, with_owner_aspect_dataset, datajob, EndOfStream(), ] # Initialize the transformer with container support transformer = PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], ".*dag_abc.*": [builder.make_user_urn("person3")], } }, "ownership_type": "DATAOWNER", "is_container": True, # Enable container ownership handling }, pipeline_context, ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert len(outputs) == len(inputs) + 4 # Check that DatasetSnapshotClass has not changed assert inputs[0] == outputs[0].record # Check the ownership for the first dataset (example1) first_ownership_aspect = outputs[3].record.aspect assert first_ownership_aspect assert len(first_ownership_aspect.owners) == 1 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in first_ownership_aspect.owners ] ) # Check the ownership for the second dataset (example2) second_ownership_aspect = builder.get_aspect_if_available( outputs[1].record, models.OwnershipClass ) assert second_ownership_aspect assert len(second_ownership_aspect.owners) == 2 # One existing + one new assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in second_ownership_aspect.owners ] ) third_ownership_aspect = outputs[4].record.aspect assert third_ownership_aspect assert len(third_ownership_aspect.owners) == 1 # new for datajob # Check container ownerships for i in range(2): container_ownership_aspect = outputs[i + 5].record.aspect assert container_ownership_aspect ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) assert len(ownership) == 3 assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1") assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2") # Verify that the third input (not a dataset) is unchanged assert inputs[2] == outputs[2].record def test_pattern_container_and_dataset_ownership_with_no_container( mock_time, mock_datahub_graph_instance ): def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> Optional[models.BrowsePathsV2Class]: return None pipeline_context = PipelineContext( run_id="test_pattern_container_and_dataset_ownership_with_no_container" ) pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore # No owner aspect for the first dataset no_owner_aspect = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ), models.BrowsePathEntryClass( id="container_2", urn="urn:li:container:container_2" ), ] ), ], ), ) # Dataset with an existing owner with_owner_aspect = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ models.OwnershipClass( owners=[ models.OwnerClass( owner=builder.make_user_urn("fake_owner"), type=models.OwnershipTypeClass.DATAOWNER, ), ], lastModified=models.AuditStampClass( time=1625266033123, actor="urn:li:corpuser:datahub" ), ), models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ), models.BrowsePathEntryClass( id="container_2", urn="urn:li:container:container_2" ), ] ), ], ), ) inputs = [ no_owner_aspect, with_owner_aspect, EndOfStream(), ] # Initialize the transformer with container support transformer = PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], } }, "ownership_type": "DATAOWNER", "is_container": True, # Enable container ownership handling }, pipeline_context, ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert len(outputs) == len(inputs) + 1 # Check the ownership for the first dataset (example1) first_ownership_aspect = outputs[2].record.aspect assert first_ownership_aspect assert len(first_ownership_aspect.owners) == 1 assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in first_ownership_aspect.owners ] ) # Check the ownership for the second dataset (example2) second_ownership_aspect = builder.get_aspect_if_available( outputs[1].record, models.OwnershipClass ) assert second_ownership_aspect assert len(second_ownership_aspect.owners) == 2 # One existing + one new assert all( [ owner.type == models.OwnershipTypeClass.DATAOWNER for owner in second_ownership_aspect.owners ] ) def test_pattern_container_and_dataset_ownership_with_no_match( mock_time, mock_datahub_graph_instance ): def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> models.BrowsePathsV2Class: return models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ) ] ) pipeline_context = PipelineContext( run_id="test_pattern_container_and_dataset_ownership_with_no_match" ) pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore # No owner aspect for the first dataset no_owner_aspect = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), ], ), ) # Dataset with an existing owner with_owner_aspect = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ models.OwnershipClass( owners=[ models.OwnerClass( owner=builder.make_user_urn("fake_owner"), type=models.OwnershipTypeClass.DATAOWNER, ), ], lastModified=models.AuditStampClass( time=1625266033123, actor="urn:li:corpuser:datahub" ), ) ], ), ) inputs = [ no_owner_aspect, with_owner_aspect, EndOfStream(), ] # Initialize the transformer with container support transformer = PatternAddDatasetOwnership.create( { "owner_pattern": { "rules": { ".*example3.*": [builder.make_user_urn("person1")], ".*example4.*": [builder.make_user_urn("person2")], } }, "ownership_type": "DATAOWNER", "is_container": True, # Enable container ownership handling }, pipeline_context, ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert len(outputs) == len(inputs) + 1 # Check the ownership for the first dataset (example1) first_ownership_aspect = outputs[2].record.aspect assert first_ownership_aspect assert builder.make_user_urn("person1") not in first_ownership_aspect.owners assert builder.make_user_urn("person2") not in first_ownership_aspect.owners # Check the ownership for the second dataset (example2) second_ownership_aspect = builder.get_aspect_if_available( outputs[1].record, models.OwnershipClass ) assert second_ownership_aspect assert len(second_ownership_aspect.owners) == 1 assert builder.make_user_urn("person1") not in second_ownership_aspect.owners assert builder.make_user_urn("person2") not in second_ownership_aspect.owners assert ( builder.make_user_urn("fake_owner") == second_ownership_aspect.owners[0].owner ) def gen_owners( owners: List[str], ownership_type: Union[ str, models.OwnershipTypeClass ] = models.OwnershipTypeClass.DATAOWNER, ) -> models.OwnershipClass: return models.OwnershipClass( owners=[models.OwnerClass(owner=owner, type=ownership_type) for owner in owners] ) def test_ownership_patching_intersect(mock_time): mock_graph = mock.MagicMock() server_ownership = gen_owners(["foo", "bar"]) mce_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = server_ownership test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners assert "foo" in [o.owner for o in test_ownership.owners] assert "bar" in [o.owner for o in test_ownership.owners] assert "baz" in [o.owner for o in test_ownership.owners] def test_ownership_patching_with_nones(mock_time): mock_graph = mock.MagicMock() mce_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = None test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners assert "foo" in [o.owner for o in test_ownership.owners] assert "baz" in [o.owner for o in test_ownership.owners] server_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = server_ownership test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", None ) assert not test_ownership def test_ownership_patching_with_empty_mce_none_server(mock_time): mock_graph = mock.MagicMock() mce_ownership = gen_owners([]) mock_graph.get_ownership.return_value = None test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) # nothing to add, so we omit writing assert test_ownership is None def test_ownership_patching_with_empty_mce_nonempty_server(mock_time): mock_graph = mock.MagicMock() server_ownership = gen_owners(["baz", "foo"]) mce_ownership = gen_owners([]) mock_graph.get_ownership.return_value = server_ownership test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) # nothing to add, so we omit writing assert test_ownership is None def test_ownership_patching_with_different_types_1(mock_time): mock_graph = mock.MagicMock() server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER) mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER) mock_graph.get_ownership.return_value = server_ownership test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners # nothing to add, so we omit writing assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [ (o.owner, o.type) for o in test_ownership.owners ] assert ("baz", models.OwnershipTypeClass.PRODUCER) in [ (o.owner, o.type) for o in test_ownership.owners ] def test_ownership_patching_with_different_types_2(mock_time): mock_graph = mock.MagicMock() server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER) mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER) mock_graph.get_ownership.return_value = server_ownership test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners assert len(test_ownership.owners) == 2 # nothing to add, so we omit writing assert ("foo", models.OwnershipTypeClass.DATAOWNER) in [ (o.owner, o.type) for o in test_ownership.owners ] assert ("baz", models.OwnershipTypeClass.DATAOWNER) in [ (o.owner, o.type) for o in test_ownership.owners ] PROPERTIES_TO_ADD = {"my_new_property": "property value"} class DummyPropertiesResolverClass(AddDatasetPropertiesResolverBase): def get_properties_to_add(self, entity_urn: str) -> Dict[str, str]: return PROPERTIES_TO_ADD def test_add_dataset_properties(mock_time): dataset_mce = make_dataset_with_properties() transformer = AddDatasetProperties.create( { "add_properties_resolver_class": "tests.unit.test_transform_dataset.DummyPropertiesResolverClass" }, PipelineContext(run_id="test-properties"), ) outputs = list( transformer.transform( [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] ) ) assert len(outputs) == 1 custom_properties = builder.get_aspect_if_available( outputs[0].record, models.DatasetPropertiesClass ) assert custom_properties is not None assert custom_properties.customProperties == { **EXISTING_PROPERTIES, **PROPERTIES_TO_ADD, } def run_simple_add_dataset_properties_transformer_semantics( semantics: TransformerSemantics, new_properties: dict, server_properties: dict, mock_datahub_graph_instance: DataHubGraph, ) -> List[RecordEnvelope]: pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms") pipeline_context.graph = mock_datahub_graph_instance # fake the server response def fake_dataset_properties(entity_urn: str) -> models.DatasetPropertiesClass: return DatasetPropertiesClass(customProperties=server_properties) pipeline_context.graph.get_dataset_properties = fake_dataset_properties # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetProperties, pipeline_context=pipeline_context, aspect=models.DatasetPropertiesClass( customProperties=EXISTING_PROPERTIES.copy() ), config={ "semantics": semantics, "properties": new_properties, }, ) return output def test_simple_add_dataset_properties_overwrite(mock_datahub_graph_instance): new_properties = {"new-simple-property": "new-value"} server_properties = {"p1": "value1"} output = run_simple_add_dataset_properties_transformer_semantics( semantics=TransformerSemantics.OVERWRITE, new_properties=new_properties, server_properties=server_properties, mock_datahub_graph_instance=mock_datahub_graph_instance, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect custom_properties_aspect: models.DatasetPropertiesClass = cast( models.DatasetPropertiesClass, output[0].record.aspect ) assert custom_properties_aspect.customProperties == { **EXISTING_PROPERTIES, **new_properties, } def test_simple_add_dataset_properties_patch(mock_datahub_graph_instance): new_properties = {"new-simple-property": "new-value"} server_properties = {"p1": "value1"} output = run_simple_add_dataset_properties_transformer_semantics( semantics=TransformerSemantics.PATCH, new_properties=new_properties, server_properties=server_properties, mock_datahub_graph_instance=mock_datahub_graph_instance, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect custom_properties_aspect: models.DatasetPropertiesClass = cast( models.DatasetPropertiesClass, output[0].record.aspect ) assert custom_properties_aspect.customProperties == { **EXISTING_PROPERTIES, **new_properties, **server_properties, } def test_simple_add_dataset_properties(mock_time): new_properties = {"new-simple-property": "new-value"} outputs = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetProperties, aspect=models.DatasetPropertiesClass( customProperties=EXISTING_PROPERTIES.copy() ), config={ "properties": new_properties, }, ) assert len(outputs) == 2 assert outputs[0].record assert outputs[0].record.aspect custom_properties_aspect: models.DatasetPropertiesClass = cast( models.DatasetPropertiesClass, outputs[0].record.aspect ) assert custom_properties_aspect.customProperties == { **EXISTING_PROPERTIES, **new_properties, } def test_simple_add_dataset_properties_replace_existing(mock_time): new_properties = {"new-simple-property": "new-value"} outputs = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetProperties, aspect=models.DatasetPropertiesClass( customProperties=EXISTING_PROPERTIES.copy() ), config={ "replace_existing": True, "properties": new_properties, }, ) assert len(outputs) == 2 assert outputs[0].record assert outputs[0].record.aspect custom_properties_aspect: models.DatasetPropertiesClass = cast( models.DatasetPropertiesClass, outputs[0].record.aspect ) assert custom_properties_aspect.customProperties == { **new_properties, } def test_simple_dataset_terms_transformation(mock_time): dataset_mce = make_generic_dataset() transformer = SimpleAddDatasetTerms.create( { "term_urns": [ builder.make_term_urn("Test"), builder.make_term_urn("Needs Review"), ] }, PipelineContext(run_id="test-terms"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 3 # Check that glossary terms were added. terms_aspect = outputs[1].record.aspect assert terms_aspect assert len(terms_aspect.terms) == 2 assert terms_aspect.terms[0].urn == builder.make_term_urn("Test") def test_pattern_dataset_terms_transformation(mock_time): dataset_mce = make_generic_dataset() transformer = PatternAddDatasetTerms.create( { "term_pattern": { "rules": { ".*example1.*": [ builder.make_term_urn("AccountBalance"), builder.make_term_urn("Email"), ], ".*example2.*": [builder.make_term_urn("Address")], } }, }, PipelineContext(run_id="test-terms"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 3 # Check that glossary terms were added. terms_aspect = outputs[1].record.aspect assert terms_aspect assert len(terms_aspect.terms) == 2 assert terms_aspect.terms[0].urn == builder.make_term_urn("AccountBalance") assert builder.make_term_urn("AccountBalance") not in terms_aspect.terms def test_mcp_add_tags_missing(mock_time): dataset_mcp = make_generic_dataset_mcp() transformer = SimpleAddDatasetTags.create( { "tag_urns": [ builder.make_tag_urn("NeedsDocumentation"), builder.make_tag_urn("Legacy"), ] }, PipelineContext(run_id="test-tags"), ) input_stream: List[RecordEnvelope] = [ RecordEnvelope(input, metadata={}) for input in [dataset_mcp] ] input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) outputs = list(transformer.transform(input_stream)) assert len(outputs) == 5 assert outputs[0].record == dataset_mcp # Check that tags were added, this will be the second result tags_aspect = outputs[1].record.aspect assert tags_aspect assert len(tags_aspect.tags) == 2 assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") assert isinstance(outputs[-1].record, EndOfStream) def test_mcp_add_tags_existing(mock_time): dataset_mcp = make_generic_dataset_mcp( aspect=GlobalTagsClass( tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))] ), ) transformer = SimpleAddDatasetTags.create( { "tag_urns": [ builder.make_tag_urn("NeedsDocumentation"), builder.make_tag_urn("Legacy"), ] }, PipelineContext(run_id="test-tags"), ) input_stream: List[RecordEnvelope] = [ RecordEnvelope(input, metadata={}) for input in [dataset_mcp] ] input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) outputs = list(transformer.transform(input_stream)) assert len(outputs) == 4 # Check that tags were added, this will be the second result tags_aspect = outputs[0].record.aspect assert tags_aspect assert len(tags_aspect.tags) == 3 assert tags_aspect.tags[0].tag == builder.make_tag_urn("Test") assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation") assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy") # Check tag entities got added assert outputs[1].record.entityType == "tag" assert outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") assert outputs[2].record.entityType == "tag" assert outputs[2].record.entityUrn == builder.make_tag_urn("Legacy") assert isinstance(outputs[-1].record, EndOfStream) def test_mcp_multiple_transformers(mock_time, tmp_path): events_file = f"{tmp_path}/multi_transformer_test.json" pipeline = Pipeline.create( config_dict={ "source": { "type": "tests.unit.test_source.FakeSource", "config": {}, }, "transformers": [ { "type": "set_dataset_browse_path", "config": { "path_templates": ["/ENV/PLATFORM/EsComments/DATASET_PARTS"] }, }, { "type": "simple_add_dataset_tags", "config": {"tag_urns": ["urn:li:tag:EsComments"]}, }, ], "sink": {"type": "file", "config": {"filename": events_file}}, } ) pipeline.run() pipeline.raise_from_status() urn_pattern = "^" + re.escape( "urn:li:dataset:(urn:li:dataPlatform:elasticsearch,fooIndex,PROD)" ) assert ( tests.test_helpers.mce_helpers.assert_mcp_entity_urn( filter="ALL", entity_type="dataset", regex_pattern=urn_pattern, file=events_file, ) == 3 ) # check on status aspect assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="status", aspect_field_matcher={"removed": False}, file=events_file, ) == 1 ) # check on globalTags aspect assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="globalTags", aspect_field_matcher={"tags": [{"tag": "urn:li:tag:EsComments"}]}, file=events_file, ) == 1 ) # check on globalTags aspect assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="browsePaths", aspect_field_matcher={"paths": ["/prod/elasticsearch/EsComments/fooIndex"]}, file=events_file, ) == 1 ) def test_mcp_multiple_transformers_replace(mock_time, tmp_path): mcps: MutableSequence[ Union[MetadataChangeEventClass, MetadataChangeProposalWrapper] ] = [ MetadataChangeProposalWrapper( entityUrn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", table_name=f"fooBarIndex{i}", env="PROD", ) ), aspect=GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:Test")]), ) for i in range(0, 10) ] mcps.extend( [ MetadataChangeProposalWrapper( entityUrn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", table_name=f"fooBarIndex{i}", env="PROD", ) ), aspect=DatasetPropertiesClass(description="test dataset"), ) for i in range(0, 10) ] ) # shuffle the mcps import random random.shuffle(mcps) events_file = create_and_run_test_pipeline( events=list(mcps), transformers=[ { "type": "set_dataset_browse_path", "config": { "path_templates": ["/ENV/PLATFORM/EsComments/DATASET_PARTS"] }, }, { "type": "simple_add_dataset_tags", "config": {"tag_urns": ["urn:li:tag:EsComments"]}, }, ], path=tmp_path, ) urn_pattern = "^" + re.escape( "urn:li:dataset:(urn:li:dataPlatform:elasticsearch,fooBarIndex" ) # there should be 30 MCP-s assert ( tests.test_helpers.mce_helpers.assert_mcp_entity_urn( filter="ALL", entity_type="dataset", regex_pattern=urn_pattern, file=events_file, ) == 30 ) # 10 globalTags aspects with new tag attached assert ( tests.test_helpers.mce_helpers.assert_for_each_entity( entity_type="dataset", aspect_name="globalTags", aspect_field_matcher={ "tags": [{"tag": "urn:li:tag:Test"}, {"tag": "urn:li:tag:EsComments"}] }, file=events_file, ) == 10 ) # check on browsePaths aspect for i in range(0, 10): assert ( tests.test_helpers.mce_helpers.assert_entity_mcp_aspect( entity_urn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", table_name=f"fooBarIndex{i}", env="PROD", ) ), aspect_name="browsePaths", aspect_field_matcher={ "paths": [f"/prod/elasticsearch/EsComments/fooBarIndex{i}"] }, file=events_file, ) == 1 ) class SuppressingTransformer(BaseTransformer, SingleAspectTransformer): @classmethod def create( cls, config_dict: dict, ctx: PipelineContext ) -> "SuppressingTransformer": return SuppressingTransformer() def entity_types(self) -> List[str]: return super().entity_types() def aspect_name(self) -> str: return "datasetProperties" def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect] ) -> Optional[builder.Aspect]: return None def test_supression_works(): dataset_mce = make_generic_dataset() dataset_mcp = make_generic_dataset_mcp( aspect=DatasetPropertiesClass(description="supressable description"), ) transformer = SuppressingTransformer.create( {}, PipelineContext(run_id="test-suppress-transformer"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, dataset_mcp, EndOfStream()] ] ) ) assert len(outputs) == 2 # MCP will be dropped def test_pattern_dataset_schema_terms_transformation(mock_time): dataset_mce = make_generic_dataset( aspects=[ models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="address", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="first_name", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="last_name", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ) ] ) transformer = PatternAddDatasetSchemaTerms.create( { "term_pattern": { "rules": { ".*first_name.*": [ builder.make_term_urn("Name"), builder.make_term_urn("FirstName"), ], ".*last_name.*": [ builder.make_term_urn("Name"), builder.make_term_urn("LastName"), ], } }, }, PipelineContext(run_id="test-schema-terms"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 2 # Check that glossary terms were added. schema_aspect = outputs[0].record.proposedSnapshot.aspects[0] assert schema_aspect assert schema_aspect.fields[0].fieldPath == "address" assert schema_aspect.fields[0].glossaryTerms is None assert schema_aspect.fields[1].fieldPath == "first_name" assert schema_aspect.fields[1].glossaryTerms.terms[0].urn == builder.make_term_urn( "Name" ) assert schema_aspect.fields[1].glossaryTerms.terms[1].urn == builder.make_term_urn( "FirstName" ) assert schema_aspect.fields[2].fieldPath == "last_name" assert schema_aspect.fields[2].glossaryTerms.terms[0].urn == builder.make_term_urn( "Name" ) assert schema_aspect.fields[2].glossaryTerms.terms[1].urn == builder.make_term_urn( "LastName" ) def test_pattern_dataset_schema_tags_transformation(mock_time): dataset_mce = make_generic_dataset( aspects=[ models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="address", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="first_name", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="last_name", type=models.SchemaFieldDataTypeClass( type=models.StringTypeClass() ), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ) ] ) transformer = PatternAddDatasetSchemaTags.create( { "tag_pattern": { "rules": { ".*first_name.*": [ builder.make_tag_urn("Name"), builder.make_tag_urn("FirstName"), ], ".*last_name.*": [ builder.make_tag_urn("Name"), builder.make_tag_urn("LastName"), ], } }, }, PipelineContext(run_id="test-schema-tags"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [dataset_mce, EndOfStream()] ] ) ) assert len(outputs) == 2 # Check that glossary terms were added. schema_aspect = outputs[0].record.proposedSnapshot.aspects[0] assert schema_aspect assert schema_aspect.fields[0].fieldPath == "address" assert schema_aspect.fields[0].globalTags is None assert schema_aspect.fields[1].fieldPath == "first_name" assert schema_aspect.fields[1].globalTags.tags[0].tag == builder.make_tag_urn( "Name" ) assert schema_aspect.fields[1].globalTags.tags[1].tag == builder.make_tag_urn( "FirstName" ) assert schema_aspect.fields[2].fieldPath == "last_name" assert schema_aspect.fields[2].globalTags.tags[0].tag == builder.make_tag_urn( "Name" ) assert schema_aspect.fields[2].globalTags.tags[1].tag == builder.make_tag_urn( "LastName" ) def run_dataset_transformer_pipeline( transformer_type: Type[Union[DatasetTransformer, TagTransformer]], aspect: Optional[builder.Aspect], config: dict, pipeline_context: Optional[PipelineContext] = None, use_mce: bool = False, ) -> List[RecordEnvelope]: if pipeline_context is None: pipeline_context = PipelineContext(run_id="transformer_pipe_line") transformer: DatasetTransformer = cast( DatasetTransformer, transformer_type.create(config, pipeline_context) ) dataset: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper] if use_mce: dataset = MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[], ) ) else: assert aspect assert transformer.aspect_name() == aspect.ASPECT_NAME dataset = make_generic_dataset_mcp(aspect=aspect) outputs = list( transformer.transform( [RecordEnvelope(input, metadata={}) for input in [dataset, EndOfStream()]] ) ) return outputs def run_container_transformer_pipeline( transformer_type: Type[ContainerTransformer], aspect: Optional[builder.Aspect], config: dict, pipeline_context: Optional[PipelineContext] = None, use_mce: bool = False, ) -> List[RecordEnvelope]: if pipeline_context is None: pipeline_context = PipelineContext(run_id="transformer_pipe_line") transformer: ContainerTransformer = cast( ContainerTransformer, transformer_type.create(config, pipeline_context) ) container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper] if use_mce: container = MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde", aspects=[], ) ) else: assert aspect container = make_generic_container_mcp( aspect=aspect, aspect_name=transformer.aspect_name() ) outputs = list( transformer.transform( [RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]] ) ) return outputs def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph_instance): pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance transformer = SimpleAddDatasetDomain.create({"domains": []}, pipeline_context) assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME def test_simple_add_dataset_domain(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={"domains": [acryl_domain]}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains def test_simple_add_dataset_domain_mce_support(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=None, config={"domains": [datahub_domain, acryl_domain]}, pipeline_context=pipeline_context, use_mce=True, ) assert len(output) == 3 assert isinstance(output[0].record, MetadataChangeEventClass) assert isinstance(output[0].record.proposedSnapshot, models.DatasetSnapshotClass) assert len(output[0].record.proposedSnapshot.aspects) == 0 assert isinstance(output[1].record, MetadataChangeProposalWrapper) assert output[1].record.aspect is not None assert isinstance(output[1].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[1].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains def test_simple_add_dataset_domain_replace_existing(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={"replace_existing": True, "domains": [acryl_domain]}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert datahub_domain not in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains def test_simple_add_dataset_domain_semantics_overwrite(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") server_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[server_domain]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "semantics": TransformerSemantics.OVERWRITE, "domains": [acryl_domain], }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains def test_simple_add_dataset_domain_semantics_patch( pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance ): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") server_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[server_domain]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": False, "semantics": TransformerSemantics.PATCH, "domains": [acryl_domain], }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 3 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains assert server_domain in transformed_aspect.domains def test_simple_add_dataset_domain_on_conflict_do_nothing( pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance ): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") server_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[server_domain]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": False, "semantics": TransformerSemantics.PATCH, "domains": [acryl_domain], "on_conflict": TransformerOnConflict.DO_NOTHING, }, pipeline_context=pipeline_context, ) assert len(output) == 1 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, EndOfStream) def test_simple_add_dataset_domain_on_conflict_do_nothing_no_conflict( pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance ): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") irrelevant_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": False, "semantics": TransformerSemantics.PATCH, "domains": [acryl_domain], "on_conflict": TransformerOnConflict.DO_NOTHING, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains assert irrelevant_domain not in transformed_aspect.domains def test_pattern_add_dataset_domain_aspect_name(mock_datahub_graph_instance): pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance transformer = PatternAddDatasetDomain.create( {"domain_pattern": {"rules": {}}}, pipeline_context ) assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME def test_pattern_add_dataset_domain_match(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*" pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains def test_pattern_add_dataset_domain_no_match(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*" pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert datahub_domain in transformed_aspect.domains assert acryl_domain not in transformed_aspect.domains def test_pattern_add_dataset_domain_replace_existing_match(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*" pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": True, "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert datahub_domain not in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains def test_pattern_add_dataset_domain_replace_existing_no_match( mock_datahub_graph_instance, ): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*" pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": True, "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 0 def test_pattern_add_dataset_domain_semantics_overwrite(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") server_domain = builder.make_domain_urn("test.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*" pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[server_domain]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "semantics": TransformerSemantics.OVERWRITE, "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 2 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains def test_pattern_add_dataset_domain_semantics_patch( pytestconfig, tmp_path, mock_time, mock_datahub_graph_instance ): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") server_domain = builder.make_domain_urn("test.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*" pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[server_domain]) pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": False, "semantics": TransformerSemantics.PATCH, "domain_pattern": {"rules": {pattern: [acryl_domain]}}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 3 assert datahub_domain in transformed_aspect.domains assert acryl_domain in transformed_aspect.domains assert server_domain in transformed_aspect.domains def test_simple_dataset_ownership_transformer_semantics_patch( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance server_owner: str = builder.make_owner_urn( "mohd@acryl.io", owner_type=builder.OwnerType.USER ) owner1: str = builder.make_owner_urn( "john@acryl.io", owner_type=builder.OwnerType.USER ) owner2: str = builder.make_owner_urn( "pedro@acryl.io", owner_type=builder.OwnerType.USER ) # Return fake aspect to simulate server behaviour def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: return models.OwnershipClass( owners=[ models.OwnerClass( owner=server_owner, type=models.OwnershipTypeClass.DATAOWNER ) ] ) pipeline_context.graph.get_ownership = fake_ownership_class # type: ignore output = run_dataset_transformer_pipeline( transformer_type=SimpleAddDatasetOwnership, aspect=models.OwnershipClass( owners=[ models.OwnerClass(owner=owner1, type=models.OwnershipTypeClass.PRODUCER) ] ), config={ "replace_existing": False, "semantics": TransformerSemantics.PATCH, "owner_urns": [owner2], "ownership_type": "DATAOWNER", }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.OwnershipClass) transformed_aspect: models.OwnershipClass = cast( models.OwnershipClass, output[0].record.aspect ) assert len(transformed_aspect.owners) == 3 owner_urns: List[str] = [ owner_class.owner for owner_class in transformed_aspect.owners ] assert owner1 in owner_urns assert owner2 in owner_urns assert server_owner in owner_urns def test_pattern_container_and_dataset_domain_transformation( mock_datahub_graph_instance, ): datahub_domain = builder.make_domain_urn("datahubproject.io") acryl_domain = builder.make_domain_urn("acryl_domain") server_domain = builder.make_domain_urn("server_domain") def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> models.BrowsePathsV2Class: return models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ), models.BrowsePathEntryClass( id="container_2", urn="urn:li:container:container_2" ), ] ) pipeline_context = PipelineContext( run_id="test_pattern_container_and_dataset_domain_transformation" ) pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore with_domain_aspect = make_generic_dataset_mcp( aspect=models.DomainsClass(domains=[datahub_domain]) ) no_domain_aspect = make_generic_dataset_mcp( entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" ) # Not a dataset, should be ignored not_a_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ models.DataJobInfoClass( name="User Deletions", description="Constructs the fct_users_deleted from logging_events", type=models.AzkabanJobTypeClass.SQL, ) ], ) ) inputs = [ with_domain_aspect, no_domain_aspect, not_a_dataset, EndOfStream(), ] # Initialize the transformer with container support for domains transformer = PatternAddDatasetDomain.create( { "domain_pattern": { "rules": { ".*example1.*": [acryl_domain, server_domain], ".*example2.*": [server_domain], } }, "is_container": True, # Enable container domain handling }, pipeline_context, ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert ( len(outputs) == len(inputs) + 3 ) # MCPs for the dataset without domains and the containers first_domain_aspect = outputs[0].record.aspect assert first_domain_aspect assert len(first_domain_aspect.domains) == 3 assert all( domain in first_domain_aspect.domains for domain in [datahub_domain, acryl_domain, server_domain] ) second_domain_aspect = outputs[3].record.aspect assert second_domain_aspect assert len(second_domain_aspect.domains) == 1 assert server_domain in second_domain_aspect.domains # Verify that the third input (not a dataset) is unchanged assert inputs[2] == outputs[2].record # Verify conainer 1 and container 2 should contain all domains container_1 = outputs[4].record.aspect assert len(container_1.domains) == 2 assert acryl_domain in container_1.domains assert server_domain in container_1.domains container_2 = outputs[5].record.aspect assert len(container_2.domains) == 2 assert acryl_domain in container_2.domains assert server_domain in container_2.domains def test_pattern_container_and_dataset_domain_transformation_with_no_container( mock_datahub_graph_instance, ): datahub_domain = builder.make_domain_urn("datahubproject.io") acryl_domain = builder.make_domain_urn("acryl_domain") server_domain = builder.make_domain_urn("server_domain") def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> Optional[models.BrowsePathsV2Class]: return None pipeline_context = PipelineContext( run_id="test_pattern_container_and_dataset_domain_transformation_with_no_container" ) pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore with_domain_aspect = make_generic_dataset_mcp( aspect=models.DomainsClass(domains=[datahub_domain]) ) no_domain_aspect = make_generic_dataset_mcp( entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" ) inputs = [ with_domain_aspect, no_domain_aspect, EndOfStream(), ] # Initialize the transformer with container support for domains transformer = PatternAddDatasetDomain.create( { "domain_pattern": { "rules": { ".*example1.*": [acryl_domain, server_domain], ".*example2.*": [server_domain], } }, "is_container": True, # Enable container domain handling }, pipeline_context, ) outputs = list( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) assert len(outputs) == len(inputs) + 1 first_domain_aspect = outputs[0].record.aspect assert first_domain_aspect assert len(first_domain_aspect.domains) == 3 assert all( domain in first_domain_aspect.domains for domain in [datahub_domain, acryl_domain, server_domain] ) second_domain_aspect = outputs[2].record.aspect assert second_domain_aspect assert len(second_domain_aspect.domains) == 1 assert server_domain in second_domain_aspect.domains def test_pattern_add_container_dataset_domain_no_match(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") datahub_domain = builder.make_domain_urn("datahubproject.io") pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*" pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" ) pipeline_context.graph = mock_datahub_graph_instance def fake_get_aspect( entity_urn: str, aspect_type: Type[models.BrowsePathsV2Class], version: int = 0, ) -> models.BrowsePathsV2Class: return models.BrowsePathsV2Class( path=[ models.BrowsePathEntryClass( id="container_1", urn="urn:li:container:container_1" ) ] ) pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetDomain, aspect=models.DomainsClass(domains=[datahub_domain]), config={ "replace_existing": True, "domain_pattern": {"rules": {pattern: [acryl_domain]}}, "is_container": True, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 0 def run_pattern_dataset_schema_terms_transformation_semantics( semantics: TransformerSemantics, mock_datahub_graph_instance: DataHubGraph, ) -> List[RecordEnvelope]: pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms") pipeline_context.graph = mock_datahub_graph_instance # fake the server response def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: return models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="first_name", glossaryTerms=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass( urn=builder.make_term_urn("pii") ) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="mobile_number", glossaryTerms=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass( urn=builder.make_term_urn("pii") ) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ) pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetSchemaTerms, pipeline_context=pipeline_context, config={ "semantics": semantics, "term_pattern": { "rules": { ".*first_name.*": [ builder.make_term_urn("Name"), builder.make_term_urn("FirstName"), ], ".*last_name.*": [ builder.make_term_urn("Name"), builder.make_term_urn("LastName"), ], } }, }, aspect=models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="address", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="first_name", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="last_name", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ), ) return output def test_pattern_dataset_schema_terms_transformation_patch( mock_time, mock_datahub_graph_instance ): output = run_pattern_dataset_schema_terms_transformation_semantics( TransformerSemantics.PATCH, mock_datahub_graph_instance ) assert len(output) == 2 # Check that glossary terms were added. assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.SchemaMetadataClass) transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect) field_path_vs_field: Dict[str, models.SchemaFieldClass] = { field.fieldPath: field for field in transform_aspect.fields } assert ( field_path_vs_field.get("mobile_number") is not None ) # server field should be preserved during patch assert field_path_vs_field["first_name"].glossaryTerms is not None assert len(field_path_vs_field["first_name"].glossaryTerms.terms) == 3 glossary_terms_urn = [ term.urn for term in field_path_vs_field["first_name"].glossaryTerms.terms ] assert builder.make_term_urn("pii") in glossary_terms_urn assert builder.make_term_urn("FirstName") in glossary_terms_urn assert builder.make_term_urn("Name") in glossary_terms_urn def test_pattern_dataset_schema_terms_transformation_overwrite( mock_time, mock_datahub_graph_instance ): output = run_pattern_dataset_schema_terms_transformation_semantics( TransformerSemantics.OVERWRITE, mock_datahub_graph_instance ) assert len(output) == 2 # Check that glossary terms were added. assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.SchemaMetadataClass) transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect) field_path_vs_field: Dict[str, models.SchemaFieldClass] = { field.fieldPath: field for field in transform_aspect.fields } assert ( field_path_vs_field.get("mobile_number") is None ) # server field should not be preserved during overwrite assert field_path_vs_field["first_name"].glossaryTerms is not None assert len(field_path_vs_field["first_name"].glossaryTerms.terms) == 2 glossary_terms_urn = [ term.urn for term in field_path_vs_field["first_name"].glossaryTerms.terms ] assert builder.make_term_urn("pii") not in glossary_terms_urn assert builder.make_term_urn("FirstName") in glossary_terms_urn assert builder.make_term_urn("Name") in glossary_terms_urn def run_pattern_dataset_schema_tags_transformation_semantics( semantics: TransformerSemantics, mock_datahub_graph_instance: DataHubGraph, ) -> List[RecordEnvelope]: pipeline_context = PipelineContext(run_id="test_pattern_dataset_schema_terms") pipeline_context.graph = mock_datahub_graph_instance # fake the server response def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: return models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="first_name", globalTags=models.GlobalTagsClass( tags=[ models.TagAssociationClass(tag=builder.make_tag_urn("pii")) ], ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="mobile_number", globalTags=models.GlobalTagsClass( tags=[ models.TagAssociationClass(tag=builder.make_tag_urn("pii")) ], ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ) pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternAddDatasetSchemaTags, pipeline_context=pipeline_context, config={ "semantics": semantics, "tag_pattern": { "rules": { ".*first_name.*": [ builder.make_tag_urn("Name"), builder.make_tag_urn("FirstName"), ], ".*last_name.*": [ builder.make_tag_urn("Name"), builder.make_tag_urn("LastName"), ], } }, }, aspect=models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="address", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="first_name", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="last_name", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ), ) return output def test_pattern_dataset_schema_tags_transformation_overwrite( mock_time, mock_datahub_graph_instance ): output = run_pattern_dataset_schema_tags_transformation_semantics( TransformerSemantics.OVERWRITE, mock_datahub_graph_instance ) assert len(output) == 2 # Check that glossary terms were added. assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.SchemaMetadataClass) transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect) field_path_vs_field: Dict[str, models.SchemaFieldClass] = { field.fieldPath: field for field in transform_aspect.fields } assert ( field_path_vs_field.get("mobile_number") is None ) # server field should not be preserved during overwrite assert field_path_vs_field["first_name"].globalTags is not None assert len(field_path_vs_field["first_name"].globalTags.tags) == 2 global_tags_urn = [ tag.tag for tag in field_path_vs_field["first_name"].globalTags.tags ] assert builder.make_tag_urn("pii") not in global_tags_urn assert builder.make_tag_urn("FirstName") in global_tags_urn assert builder.make_tag_urn("Name") in global_tags_urn def test_pattern_dataset_schema_tags_transformation_patch( mock_time, mock_datahub_graph_instance ): output = run_pattern_dataset_schema_tags_transformation_semantics( TransformerSemantics.PATCH, mock_datahub_graph_instance ) assert len(output) == 2 # Check that global tags were added. assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.SchemaMetadataClass) transform_aspect = cast(models.SchemaMetadataClass, output[0].record.aspect) field_path_vs_field: Dict[str, models.SchemaFieldClass] = { field.fieldPath: field for field in transform_aspect.fields } assert ( field_path_vs_field.get("mobile_number") is not None ) # server field should be preserved during patch assert field_path_vs_field["first_name"].globalTags is not None assert len(field_path_vs_field["first_name"].globalTags.tags) == 3 global_tags_urn = [ tag.tag for tag in field_path_vs_field["first_name"].globalTags.tags ] assert builder.make_tag_urn("pii") in global_tags_urn assert builder.make_tag_urn("FirstName") in global_tags_urn assert builder.make_tag_urn("Name") in global_tags_urn def test_simple_dataset_data_product_transformation(mock_time): transformer = SimpleAddDatasetDataProduct.create( { "dataset_to_data_product_urns": { builder.make_dataset_urn( "bigquery", "example1" ): "urn:li:dataProduct:first", builder.make_dataset_urn( "bigquery", "example2" ): "urn:li:dataProduct:second", builder.make_dataset_urn( "bigquery", "example3" ): "urn:li:dataProduct:first", } }, PipelineContext(run_id="test-dataproduct"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [ make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example1") ), make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example2") ), make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example3") ), EndOfStream(), ] ] ) ) assert len(outputs) == 6 # Check new dataproduct entity should be there assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" assert outputs[3].record.aspectName == "dataProductProperties" first_data_product_aspect = json.loads( outputs[3].record.aspect.value.decode("utf-8") ) assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example1"), builder.make_dataset_urn("bigquery", "example3"), ] second_data_product_aspect = json.loads( outputs[4].record.aspect.value.decode("utf-8") ) assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example2") ] assert isinstance(outputs[5].record, EndOfStream) def test_pattern_dataset_data_product_transformation(mock_time): transformer = PatternAddDatasetDataProduct.create( { "dataset_to_data_product_urns_pattern": { "rules": { ".*example1.*": "urn:li:dataProduct:first", ".*": "urn:li:dataProduct:second", } }, }, PipelineContext(run_id="test-dataproducts"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [ make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example1") ), make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example2") ), make_generic_dataset( entity_urn=builder.make_dataset_urn("bigquery", "example3") ), EndOfStream(), ] ] ) ) assert len(outputs) == 6 # Check new dataproduct entity should be there assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" assert outputs[3].record.aspectName == "dataProductProperties" first_data_product_aspect = json.loads( outputs[3].record.aspect.value.decode("utf-8") ) assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example1") ] second_data_product_aspect = json.loads( outputs[4].record.aspect.value.decode("utf-8") ) assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example2"), builder.make_dataset_urn("bigquery", "example3"), ] assert isinstance(outputs[5].record, EndOfStream) def dummy_data_product_resolver_method(dataset_urn): dataset_to_data_product_map = { builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first" } return dataset_to_data_product_map.get(dataset_urn) def test_add_dataset_data_product_transformation(): transformer = AddDatasetDataProduct.create( { "get_data_product_to_add": "tests.unit.test_transform_dataset.dummy_data_product_resolver_method" }, PipelineContext(run_id="test-dataproduct"), ) outputs = list( transformer.transform( [ RecordEnvelope(input, metadata={}) for input in [make_generic_dataset(), EndOfStream()] ] ) ) # Check new dataproduct entity should be there assert outputs[1].record.entityUrn == "urn:li:dataProduct:first" assert outputs[1].record.aspectName == "dataProductProperties" first_data_product_aspect = json.loads( outputs[1].record.aspect.value.decode("utf-8") ) assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example1") ] def _test_clean_owner_urns( in_pipeline_context: Any, in_owners: List[str], config: List[Union[re.Pattern, str]], cleaned_owner_urn: List[str], ) -> None: # Return fake aspect to simulate server behaviour def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: return models.OwnershipClass( owners=[ models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER) for owner in in_owners ] ) in_pipeline_context.graph.get_ownership = fake_ownership_class # type: ignore output = run_dataset_transformer_pipeline( transformer_type=PatternCleanUpOwnership, aspect=models.OwnershipClass( owners=[ models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER) for owner in in_owners ] ), config={"pattern_for_cleanup": config}, pipeline_context=in_pipeline_context, ) assert len(output) == 2 ownership_aspect = output[0].record.aspect assert isinstance(ownership_aspect, OwnershipClass) assert len(ownership_aspect.owners) == len(in_owners) out_owners = [owner.owner for owner in ownership_aspect.owners] assert set(out_owners) == set(cleaned_owner_urn) def test_clean_owner_urn_transformation_remove_fixed_string( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove 'ABCDEF:' config: List[Union[re.Pattern, str]] = ["ABCDEF:"] expected_user_emails: List[str] = [ "email_id@example.com", "123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_multiple_values( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove multiple values config: List[Union[re.Pattern, str]] = ["ABCDEF:", "email"] expected_user_emails: List[str] = [ "_id@example.com", "123_id@example.com", "_id@example.co.in", "_id@example.co.uk", "_test:XYZ@example.com", "_id:id1@example.com", "_id:id2@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_values_using_regex( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove words after `_` using RegEx i.e. `id`, `test` config: List[Union[re.Pattern, str]] = [r"(?<=_)(\w+)"] expected_user_emails: List[str] = [ "ABCDEF:email_@example.com", "ABCDEF:123email_@example.com", "email_@example.co.in", "email_@example.co.uk", "email_:XYZ@example.com", "email_:id1@example.com", "email_:id2@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_digits(mock_datahub_graph_instance): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove digits config: List[Union[re.Pattern, str]] = [r"\d+"] expected_user_emails: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id@example.com", "email_id:id@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_pattern(mock_datahub_graph_instance): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove `example.*` config: List[Union[re.Pattern, str]] = [r"@example\.\S*"] expected_user_emails: List[str] = [ "ABCDEF:email_id", "ABCDEF:123email_id", "email_id", "email_id", "email_test:XYZ", "email_id:id1", "email_id:id2", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_word_in_capital_letters( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", "email_test:XYabZ@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # if string between `:` and `@` is in CAPITAL then remove it config: List[Union[re.Pattern, str]] = ["(?<=:)[A-Z]+(?=@)"] expected_user_emails: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:@example.com", "email_id:id1@example.com", "email_id:id2@example.com", "email_test:XYabZ@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_remove_pattern_with_alphanumeric_value( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # remove any pattern having `id` followed by any digits config: List[Union[re.Pattern, str]] = [r"id\d+"] expected_user_emails: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:@example.com", "email_id:@example.com", ] expected_owner_urns: List[str] = [] for user in expected_user_emails: expected_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_urn_transformation_should_not_remove_system_identifier( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance user_emails = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for user in user_emails: in_owner_urns.append( builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) ) # should not remove system identifier config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"] _test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns) def test_clean_owner_group_urn_transformation_remove_fixed_string( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove 'ABCDEF:' config: List[Union[re.Pattern, str]] = ["ABCDEF:"] expected_group_ids: List[str] = [ "email_id@example.com", "123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_multiple_values( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove multiple values config: List[Union[re.Pattern, str]] = ["ABCDEF:", "email"] expected_group_ids: List[str] = [ "_id@example.com", "123_id@example.com", "_id@example.co.in", "_id@example.co.uk", "_test:XYZ@example.com", "_id:id1@example.com", "_id:id2@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_values_using_regex( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove words after `_` using RegEx i.e. `id`, `test` config: List[Union[re.Pattern, str]] = [r"(?<=_)(\w+)"] expected_group_ids: List[str] = [ "ABCDEF:email_@example.com", "ABCDEF:123email_@example.com", "email_@example.co.in", "email_@example.co.uk", "email_:XYZ@example.com", "email_:id1@example.com", "email_:id2@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_digits( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove digits config: List[Union[re.Pattern, str]] = [r"\d+"] expected_group_ids: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id@example.com", "email_id:id@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_pattern( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove `example.*` config: List[Union[re.Pattern, str]] = [r"@example\.\S*"] expected_group_ids: List[str] = [ "ABCDEF:email_id", "ABCDEF:123email_id", "email_id", "email_id", "email_test:XYZ", "email_id:id1", "email_id:id2", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_word_in_capital_letters( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", "email_test:XYabZ@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # if string between `:` and `@` is in CAPITAL then remove it config: List[Union[re.Pattern, str]] = ["(?<=:)[A-Z]+(?=@)"] expected_group_ids: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:@example.com", "email_id:id1@example.com", "email_id:id2@example.com", "email_test:XYabZ@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_remove_pattern_with_alphanumeric_value( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # remove any pattern having `id` followed by any digits config: List[Union[re.Pattern, str]] = [r"id\d+"] expected_group_ids: List[str] = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:@example.com", "email_id:@example.com", ] expected_owner_urns: List[str] = [] for group in expected_group_ids: expected_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) def test_clean_owner_group_urn_transformation_should_not_remove_system_identifier( mock_datahub_graph_instance, ): pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance group_ids = [ "ABCDEF:email_id@example.com", "ABCDEF:123email_id@example.com", "email_id@example.co.in", "email_id@example.co.uk", "email_test:XYZ@example.com", "email_id:id1@example.com", "email_id:id2@example.com", ] in_owner_urns: List[str] = [] for group in group_ids: in_owner_urns.append( builder.make_owner_urn(group, owner_type=builder.OwnerType.GROUP) ) # should not remove system identifier config: List[Union[re.Pattern, str]] = ["urn:li:corpGroup:"] _test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns) def test_replace_external_url_word_replace( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), ), config={"input_pattern": "datahub", "replacement": "starhub"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml" ) def test_replace_external_regex_replace_1( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), ), config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://github.com/starhub/test/foo.view.lkml" ) def test_replace_external_regex_replace_2( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url" ) pipeline_context.graph = mock_datahub_graph_instance output = run_dataset_transformer_pipeline( transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), ), config={"input_pattern": r"\b\w*hub\b", "replacement": "test"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://test.com/test/looker-demo/blob/master/foo.view.lkml" ) def test_pattern_cleanup_usage_statistics_user_1( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_pattern_cleanup_usage_statistics_user" ) pipeline_context.graph = mock_datahub_graph_instance TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) output = run_dataset_transformer_pipeline( transformer_type=PatternCleanupDatasetUsageUser, aspect=models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("IAM:user1"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("user2"), count=2, userEmail="user2@exaple.com", ), ], ), config={"pattern_for_cleanup": ["IAM:"]}, pipeline_context=pipeline_context, ) expectedUsageStatistics = models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("user1"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("user2"), count=2, userEmail="user2@exaple.com", ), ], ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts def test_pattern_cleanup_usage_statistics_user_2( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_pattern_cleanup_usage_statistics_user" ) pipeline_context.graph = mock_datahub_graph_instance TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) output = run_dataset_transformer_pipeline( transformer_type=PatternCleanupDatasetUsageUser, aspect=models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("test_user_1"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("test_user_2"), count=2, userEmail="user2@exaple.com", ), ], ), config={"pattern_for_cleanup": ["_user"]}, pipeline_context=pipeline_context, ) expectedUsageStatistics = models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("test_1"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("test_2"), count=2, userEmail="user2@exaple.com", ), ], ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts def test_pattern_cleanup_usage_statistics_user_3( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_pattern_cleanup_usage_statistics_user" ) pipeline_context.graph = mock_datahub_graph_instance TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) output = run_dataset_transformer_pipeline( transformer_type=PatternCleanupDatasetUsageUser, aspect=models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("abc_user_1"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("xyz_user_2"), count=2, userEmail="user2@exaple.com", ), ], ), config={"pattern_for_cleanup": [r"_user_\d+"]}, pipeline_context=pipeline_context, ) expectedUsageStatistics = models.DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), userCounts=[ DatasetUserUsageCountsClass( user=builder.make_user_urn("abc"), count=1, userEmail="user1@exaple.com", ), DatasetUserUsageCountsClass( user=builder.make_user_urn("xyz"), count=2, userEmail="user2@exaple.com", ), ], ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") server_domain = builder.make_domain_urn("test.io") tag_one = builder.make_tag_urn("test:tag_1") # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)]) pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[server_domain]), config={"domain_mapping": {"test:tag_1": acryl_domain}}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0] is not None assert output[0].record is not None assert isinstance(output[0].record, MetadataChangeProposalWrapper) assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains def test_domain_mapping_based_on_tags_with_no_matching_tags( mock_datahub_graph_instance, ): acryl_domain = builder.make_domain_urn("acryl.io") server_domain = builder.make_domain_urn("test.io") non_matching_tag = builder.make_tag_urn("nonMatching") pipeline_context = PipelineContext(run_id="no_match_pipeline") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)]) pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[server_domain]), config={ "domain_mapping": {"test:tag_1": acryl_domain}, }, pipeline_context=pipeline_context, ) assert len(output) == 2 assert isinstance(output[0].record.aspect, models.DomainsClass) assert len(output[0].record.aspect.domains) == 1 transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert acryl_domain not in transformed_aspect.domains assert server_domain in transformed_aspect.domains def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph_instance): some_tag = builder.make_tag_urn("someTag") pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)]) pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[]), config={"domain_mapping": {}}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert isinstance(output[0].record.aspect, models.DomainsClass) assert len(output[0].record.aspect.domains) == 0 def test_domain_mapping_based__r_on_tags_with_multiple_tags( mock_datahub_graph_instance, ): # Two tags that match different rules in the domain mapping configuration tag_one = builder.make_tag_urn("test:tag_1") tag_two = builder.make_tag_urn("test:tag_2") existing_domain = builder.make_domain_urn("existing.io") finance = builder.make_domain_urn("finance") hr = builder.make_domain_urn("hr") pipeline_context = PipelineContext(run_id="multiple_matches_pipeline") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass( tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)] ) # Return fake aspect to simulate server behaviour def fake_get_domain(entity_urn: str) -> models.DomainsClass: return models.DomainsClass(domains=[existing_domain]) pipeline_context.graph.get_tags = fake_get_tags # type: ignore pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[existing_domain]), config={ "domain_mapping": {"test:tag_1": finance, "test:tag_2": hr}, "semantics": "PATCH", }, pipeline_context=pipeline_context, ) # Assertions to verify the expected outcome assert len(output) == 2 assert output[0].record is not None assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) # Expecting domains from both matched tags assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr} assert len(transformed_aspect.domains) == 3 def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") server_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[]) pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[acryl_domain]), config={"domain_mapping": {"test:tag_1": server_domain}}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert isinstance(output[0].record.aspect, models.DomainsClass) assert len(output[0].record.aspect.domains) == 1 transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph_instance): acryl_domain = builder.make_domain_urn("acryl.io") server_domain = builder.make_domain_urn("test.io") pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph_instance # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: return None pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[acryl_domain]), config={"domain_mapping": {"test:tag_1": server_domain}}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert isinstance(output[0].record.aspect, models.DomainsClass) assert len(output[0].record.aspect.domains) == 1 transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) assert len(transformed_aspect.domains) == 1 assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains def test_tags_to_terms_transformation(mock_datahub_graph_instance): # Create domain URNs for the test term_urn_example1 = builder.make_term_urn("example1") term_urn_example2 = builder.make_term_urn("example2") def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass( tags=[ TagAssociationClass(tag=builder.make_tag_urn("example1")), TagAssociationClass(tag=builder.make_tag_urn("example2")), ] ) # fake the server response def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: return models.SchemaMetadataClass( schemaName="customer", # not used platform=builder.make_data_platform_urn( "hive" ), # important <- platform must be an urn version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string platformSchema=models.OtherSchemaClass( rawSchema="__insert raw schema here__" ), fields=[ models.SchemaFieldClass( fieldPath="first_name", globalTags=models.GlobalTagsClass( tags=[ models.TagAssociationClass( tag=builder.make_tag_urn("example2") ) ], ), glossaryTerms=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass( urn=builder.make_term_urn("pii") ) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), models.SchemaFieldClass( fieldPath="mobile_number", glossaryTerms=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass( urn=builder.make_term_urn("pii") ) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(100)", # use this to provide the type of the field in the source system's vernacular ), ], ) pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_tags = fake_get_tags # type: ignore pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore # Configuring the transformer config = {"tags": ["example1", "example2"]} # Running the transformer within a test pipeline output = run_dataset_transformer_pipeline( transformer_type=TagsToTermMapper, aspect=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), config=config, pipeline_context=pipeline_context, ) # Expected results expected_terms = [term_urn_example2, term_urn_example1] # Verify the output assert len(output) == 2 # One for result and one for end of stream terms_aspect = output[0].record.aspect assert isinstance(terms_aspect, models.GlossaryTermsClass) assert len(terms_aspect.terms) == len(expected_terms) assert set(term.urn for term in terms_aspect.terms) == { "urn:li:glossaryTerm:example1", "urn:li:glossaryTerm:example2", } def test_tags_to_terms_with_no_matching_terms(mock_datahub_graph_instance): # Setup for test where no tags match the provided term mappings def fake_get_tags_no_match(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass( tags=[ TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag1")), TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag2")), ] ) pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_tags = fake_get_tags_no_match # type: ignore # No matching terms in config config = {"tags": ["example1", "example2"]} # Running the transformer within a test pipeline output = run_dataset_transformer_pipeline( transformer_type=TagsToTermMapper, aspect=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), config=config, pipeline_context=pipeline_context, ) # Verify the output assert len(output) == 2 # One for result and one for end of stream terms_aspect = output[0].record.aspect assert isinstance(terms_aspect, models.GlossaryTermsClass) assert len(terms_aspect.terms) == 1 def test_tags_to_terms_with_missing_tags(mock_datahub_graph_instance): # Setup for test where no tags are present def fake_get_no_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[]) pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_tags = fake_get_no_tags # type: ignore config = {"tags": ["example1", "example2"]} # Running the transformer with no tags output = run_dataset_transformer_pipeline( transformer_type=TagsToTermMapper, aspect=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), config=config, pipeline_context=pipeline_context, ) # Verify that no terms are added when there are no tags assert len(output) == 2 terms_aspect = output[0].record.aspect assert isinstance(terms_aspect, models.GlossaryTermsClass) assert len(terms_aspect.terms) == 1 def test_tags_to_terms_with_partial_match(mock_datahub_graph_instance): # Setup for partial match scenario def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass( tags=[ TagAssociationClass( tag=builder.make_tag_urn("example1") ), # Should match TagAssociationClass( tag=builder.make_tag_urn("nonMatchingTag") ), # No match ] ) pipeline_context = PipelineContext(run_id="transformer_pipe_line") pipeline_context.graph = mock_datahub_graph_instance pipeline_context.graph.get_tags = fake_get_partial_match_tags # type: ignore config = {"tags": ["example1"]} # Only 'example1' has a term mapped # Running the transformer with partial matching tags output = run_dataset_transformer_pipeline( transformer_type=TagsToTermMapper, aspect=models.GlossaryTermsClass( terms=[ models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) ], auditStamp=models.AuditStampClass._construct_with_defaults(), ), config=config, pipeline_context=pipeline_context, ) # Verify that only matched term is added assert len(output) == 2 terms_aspect = output[0].record.aspect assert isinstance(terms_aspect, models.GlossaryTermsClass) assert len(terms_aspect.terms) == 1 assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1" def test_replace_external_url_container_word_replace( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url_container" ) pipeline_context.graph = mock_datahub_graph_instance output = run_container_transformer_pipeline( transformer_type=ReplaceExternalUrlContainer, aspect=models.ContainerPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), name="sample_test", ), config={"input_pattern": "datahub", "replacement": "starhub"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml" ) def test_replace_external_regex_container_replace_1( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url_container" ) pipeline_context.graph = mock_datahub_graph_instance output = run_container_transformer_pipeline( transformer_type=ReplaceExternalUrlContainer, aspect=models.ContainerPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), name="sample_test", ), config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://github.com/starhub/test/foo.view.lkml" ) def test_replace_external_regex_container_replace_2( mock_datahub_graph_instance, ): pipeline_context: PipelineContext = PipelineContext( run_id="test_replace_external_url_container" ) pipeline_context.graph = mock_datahub_graph_instance output = run_container_transformer_pipeline( transformer_type=ReplaceExternalUrlContainer, aspect=models.ContainerPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), name="sample_test", ), config={"input_pattern": r"\b\w*hub\b", "replacement": "test"}, pipeline_context=pipeline_context, ) assert len(output) == 2 assert output[0].record assert output[0].record.aspect assert ( output[0].record.aspect.externalUrl == "https://test.com/test/looker-demo/blob/master/foo.view.lkml" )