diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 217d65ed05..64d1438cfc 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | -| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) | +| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains)
- [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) | | `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct) ## Extract Ownership from Tags @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` + + + +## Domain Mapping Based on Tags +### Config Details + +| Field | Required | Type | Default | Description | +|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------| +| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. | +| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.| + +
+ +let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + +Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS + +When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN. + +For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration + +```yaml +transformers: + - type: "domain_mapping_based_on_tags" + config: + domain_mapping: + 'NeedsDocumentation': "urn:li:domain:documentation" +``` + + +`domain_mapping_based_on_tags` can be configured in below different way + +- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: OVERWRITE # OVERWRITE is default behaviour + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` +- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: PATCH + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` + ## Simple Add Dataset dataProduct ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 668845e776..9d35b9b8ca 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -708,6 +708,7 @@ entry_points = { "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", + "domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py new file mode 100644 index 0000000000..7be8069e1b --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py @@ -0,0 +1,70 @@ +from typing import Dict, List, Optional, Set, cast + +from datahub.configuration.common import ( + TransformerSemantics, + TransformerSemanticsConfigModel, +) +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain +from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer +from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass + + +class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel): + domain_mapping: Dict[str, str] + + +class DatasetTagDomainMapper(DatasetDomainTransformer): + """A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer.""" + + def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext): + super().__init__() + self.ctx: PipelineContext = ctx + self.config: DatasetTagDomainMapperConfig = config + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "DatasetTagDomainMapper": + config = DatasetTagDomainMapperConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + # Initialize the existing domain aspect + existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect) + assert self.ctx.graph + global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) + # Check if we have tags received in existing aspect + if global_tags: + domain_mapping = self.config.domain_mapping + transformer_tags = domain_mapping.keys() + tags_seen: Set[str] = set() + for tag_item in global_tags.tags: + tag = tag_item.tag.split("urn:li:tag:")[-1] + if tag in transformer_tags: + tags_seen.add(tag) + + if tags_seen: + domain_aspect = DomainsClass(domains=[]) + domains_to_add: List[str] = [] + for tag in tags_seen: + if domain_mapping.get(tag): + domains_to_add.append(domain_mapping[tag]) + + mapped_domains = AddDatasetDomain.get_domain_class( + self.ctx.graph, domains_to_add + ) + domain_aspect.domains.extend(mapped_domains.domains) + if self.config.semantics == TransformerSemantics.PATCH: + # Try merging with server-side domains + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], patch_domain_aspect) + return cast(Optional[Aspect], domain_aspect) + return cast(Optional[Aspect], existing_domain_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 7e01dd8909..a0deae972b 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -67,6 +67,9 @@ from datahub.ingestion.transformer.dataset_domain import ( PatternAddDatasetDomain, SimpleAddDatasetDomain, ) +from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( + DatasetTagDomainMapper, +) from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags from datahub.ingestion.transformer.extract_ownership_from_tags import ( @@ -3458,3 +3461,193 @@ def test_pattern_cleanup_usage_statistics_user_3( assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts + + +def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + + tag_one = builder.make_tag_urn("test:tag_1") + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)]) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[server_domain]), + config={"domain_mapping": {"test:tag_1": acryl_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + non_matching_tag = builder.make_tag_urn("nonMatching") + + pipeline_context = PipelineContext(run_id="no_match_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[server_domain]), + config={ + "domain_mapping": {"test:tag_1": acryl_domain}, + }, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain not in transformed_aspect.domains + assert server_domain in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph): + some_tag = builder.make_tag_urn("someTag") + + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[]), + config={"domain_mapping": {}}, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 0 + + +def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph): + # Two tags that match different rules in the domain mapping configuration + tag_one = builder.make_tag_urn("test:tag_1") + tag_two = builder.make_tag_urn("test:tag_2") + existing_domain = builder.make_domain_urn("existing.io") + finance = builder.make_domain_urn("finance") + hr = builder.make_domain_urn("hr") + + pipeline_context = PipelineContext(run_id="multiple_matches_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)] + ) + + # Return fake aspect to simulate server behaviour + def fake_get_domain(entity_urn: str) -> models.DomainsClass: + return models.DomainsClass(domains=[existing_domain]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + pipeline_context.graph.get_domain = fake_get_domain # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[existing_domain]), + config={ + "domain_mapping": {"test:tag_1": finance, "test:tag_2": hr}, + "semantics": "PATCH", + }, + pipeline_context=pipeline_context, + ) + + # Assertions to verify the expected outcome + assert len(output) == 2 + assert output[0].record is not None + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + + # Expecting domains from both matched tags + assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr} + assert len(transformed_aspect.domains) == 3 + + +def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={"domain_mapping": {"test:tag_1": server_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: + return None + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={"domain_mapping": {"test:tag_1": server_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains