mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 17:08:29 +00:00
fix(ingest/transformer): Add dataset domains based on tags using transformer (#10458)
This commit is contained in:
parent
574fcc7692
commit
5fbf781558
@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
|
||||
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
|
||||
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
|
||||
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
|
||||
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
|
||||
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
|
||||
| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)<br/> - [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)<br/> - [Add Dataset dataProduct](#add-dataset-dataproduct)
|
||||
|
||||
## Extract Ownership from Tags
|
||||
@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS
|
||||
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
|
||||
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Domain Mapping Based on Tags
|
||||
### Config Details
|
||||
|
||||
| Field | Required | Type | Default | Description |
|
||||
|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------|
|
||||
| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. |
|
||||
| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.|
|
||||
|
||||
<br/>
|
||||
|
||||
let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer.
|
||||
|
||||
The config, which we’d append to our ingestion recipe YAML, would look like this:
|
||||
|
||||
Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS
|
||||
|
||||
When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN.
|
||||
|
||||
For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration
|
||||
|
||||
```yaml
|
||||
transformers:
|
||||
- type: "domain_mapping_based_on_tags"
|
||||
config:
|
||||
domain_mapping:
|
||||
'NeedsDocumentation': "urn:li:domain:documentation"
|
||||
```
|
||||
|
||||
|
||||
`domain_mapping_based_on_tags` can be configured in below different way
|
||||
|
||||
- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
|
||||
```yaml
|
||||
transformers:
|
||||
- type: "domain_mapping_based_on_tags"
|
||||
config:
|
||||
semantics: OVERWRITE # OVERWRITE is default behaviour
|
||||
domain_mapping:
|
||||
'example1': "urn:li:domain:engineering"
|
||||
'example2': "urn:li:domain:hr"
|
||||
```
|
||||
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
|
||||
```yaml
|
||||
transformers:
|
||||
- type: "domain_mapping_based_on_tags"
|
||||
config:
|
||||
semantics: PATCH
|
||||
domain_mapping:
|
||||
'example1': "urn:li:domain:engineering"
|
||||
'example2': "urn:li:domain:hr"
|
||||
```
|
||||
|
||||
## Simple Add Dataset dataProduct
|
||||
### Config Details
|
||||
| Field | Required | Type | Default | Description |
|
||||
|
||||
@ -708,6 +708,7 @@ entry_points = {
|
||||
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
|
||||
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
|
||||
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
|
||||
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
|
||||
],
|
||||
"datahub.ingestion.sink.plugins": [
|
||||
"file = datahub.ingestion.sink.file:FileSink",
|
||||
|
||||
@ -0,0 +1,70 @@
|
||||
from typing import Dict, List, Optional, Set, cast
|
||||
|
||||
from datahub.configuration.common import (
|
||||
TransformerSemantics,
|
||||
TransformerSemanticsConfigModel,
|
||||
)
|
||||
from datahub.emitter.mce_builder import Aspect
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
|
||||
from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer
|
||||
from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass
|
||||
|
||||
|
||||
class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel):
|
||||
domain_mapping: Dict[str, str]
|
||||
|
||||
|
||||
class DatasetTagDomainMapper(DatasetDomainTransformer):
|
||||
"""A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer."""
|
||||
|
||||
def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext):
|
||||
super().__init__()
|
||||
self.ctx: PipelineContext = ctx
|
||||
self.config: DatasetTagDomainMapperConfig = config
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict: dict, ctx: PipelineContext
|
||||
) -> "DatasetTagDomainMapper":
|
||||
config = DatasetTagDomainMapperConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
def transform_aspect(
|
||||
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
|
||||
) -> Optional[Aspect]:
|
||||
# Initialize the existing domain aspect
|
||||
existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect)
|
||||
assert self.ctx.graph
|
||||
global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn)
|
||||
# Check if we have tags received in existing aspect
|
||||
if global_tags:
|
||||
domain_mapping = self.config.domain_mapping
|
||||
transformer_tags = domain_mapping.keys()
|
||||
tags_seen: Set[str] = set()
|
||||
for tag_item in global_tags.tags:
|
||||
tag = tag_item.tag.split("urn:li:tag:")[-1]
|
||||
if tag in transformer_tags:
|
||||
tags_seen.add(tag)
|
||||
|
||||
if tags_seen:
|
||||
domain_aspect = DomainsClass(domains=[])
|
||||
domains_to_add: List[str] = []
|
||||
for tag in tags_seen:
|
||||
if domain_mapping.get(tag):
|
||||
domains_to_add.append(domain_mapping[tag])
|
||||
|
||||
mapped_domains = AddDatasetDomain.get_domain_class(
|
||||
self.ctx.graph, domains_to_add
|
||||
)
|
||||
domain_aspect.domains.extend(mapped_domains.domains)
|
||||
if self.config.semantics == TransformerSemantics.PATCH:
|
||||
# Try merging with server-side domains
|
||||
patch_domain_aspect: Optional[
|
||||
DomainsClass
|
||||
] = AddDatasetDomain._merge_with_server_domains(
|
||||
self.ctx.graph, entity_urn, domain_aspect
|
||||
)
|
||||
return cast(Optional[Aspect], patch_domain_aspect)
|
||||
return cast(Optional[Aspect], domain_aspect)
|
||||
return cast(Optional[Aspect], existing_domain_aspect)
|
||||
@ -67,6 +67,9 @@ from datahub.ingestion.transformer.dataset_domain import (
|
||||
PatternAddDatasetDomain,
|
||||
SimpleAddDatasetDomain,
|
||||
)
|
||||
from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
|
||||
DatasetTagDomainMapper,
|
||||
)
|
||||
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
|
||||
from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags
|
||||
from datahub.ingestion.transformer.extract_ownership_from_tags import (
|
||||
@ -3458,3 +3461,193 @@ def test_pattern_cleanup_usage_statistics_user_3(
|
||||
assert output[0].record.aspect
|
||||
assert len(output[0].record.aspect.userCounts) == 2
|
||||
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts
|
||||
|
||||
|
||||
def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph):
|
||||
acryl_domain = builder.make_domain_urn("acryl.io")
|
||||
server_domain = builder.make_domain_urn("test.io")
|
||||
|
||||
tag_one = builder.make_tag_urn("test:tag_1")
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
|
||||
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)])
|
||||
|
||||
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[server_domain]),
|
||||
config={"domain_mapping": {"test:tag_1": acryl_domain}},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert output[0] is not None
|
||||
assert output[0].record is not None
|
||||
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
|
||||
assert output[0].record.aspect is not None
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
|
||||
assert len(transformed_aspect.domains) == 1
|
||||
assert acryl_domain in transformed_aspect.domains
|
||||
assert server_domain not in transformed_aspect.domains
|
||||
|
||||
|
||||
def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph):
|
||||
acryl_domain = builder.make_domain_urn("acryl.io")
|
||||
server_domain = builder.make_domain_urn("test.io")
|
||||
non_matching_tag = builder.make_tag_urn("nonMatching")
|
||||
|
||||
pipeline_context = PipelineContext(run_id="no_match_pipeline")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
|
||||
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)])
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[server_domain]),
|
||||
config={
|
||||
"domain_mapping": {"test:tag_1": acryl_domain},
|
||||
},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
assert len(output) == 2
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
assert len(output[0].record.aspect.domains) == 1
|
||||
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
|
||||
assert len(transformed_aspect.domains) == 1
|
||||
assert acryl_domain not in transformed_aspect.domains
|
||||
assert server_domain in transformed_aspect.domains
|
||||
|
||||
|
||||
def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph):
|
||||
some_tag = builder.make_tag_urn("someTag")
|
||||
|
||||
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
|
||||
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)])
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[]),
|
||||
config={"domain_mapping": {}},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
assert len(output) == 2
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
assert len(output[0].record.aspect.domains) == 0
|
||||
|
||||
|
||||
def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph):
|
||||
# Two tags that match different rules in the domain mapping configuration
|
||||
tag_one = builder.make_tag_urn("test:tag_1")
|
||||
tag_two = builder.make_tag_urn("test:tag_2")
|
||||
existing_domain = builder.make_domain_urn("existing.io")
|
||||
finance = builder.make_domain_urn("finance")
|
||||
hr = builder.make_domain_urn("hr")
|
||||
|
||||
pipeline_context = PipelineContext(run_id="multiple_matches_pipeline")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
|
||||
return models.GlobalTagsClass(
|
||||
tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)]
|
||||
)
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
|
||||
return models.DomainsClass(domains=[existing_domain])
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
pipeline_context.graph.get_domain = fake_get_domain # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[existing_domain]),
|
||||
config={
|
||||
"domain_mapping": {"test:tag_1": finance, "test:tag_2": hr},
|
||||
"semantics": "PATCH",
|
||||
},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
# Assertions to verify the expected outcome
|
||||
assert len(output) == 2
|
||||
assert output[0].record is not None
|
||||
assert output[0].record.aspect is not None
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
|
||||
|
||||
# Expecting domains from both matched tags
|
||||
assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr}
|
||||
assert len(transformed_aspect.domains) == 3
|
||||
|
||||
|
||||
def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph):
|
||||
acryl_domain = builder.make_domain_urn("acryl.io")
|
||||
server_domain = builder.make_domain_urn("test.io")
|
||||
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
|
||||
return models.GlobalTagsClass(tags=[])
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[acryl_domain]),
|
||||
config={"domain_mapping": {"test:tag_1": server_domain}},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
assert len(output[0].record.aspect.domains) == 1
|
||||
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
|
||||
assert len(transformed_aspect.domains) == 1
|
||||
assert acryl_domain in transformed_aspect.domains
|
||||
assert server_domain not in transformed_aspect.domains
|
||||
|
||||
|
||||
def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph):
|
||||
acryl_domain = builder.make_domain_urn("acryl.io")
|
||||
server_domain = builder.make_domain_urn("test.io")
|
||||
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
|
||||
|
||||
# Return fake aspect to simulate server behaviour
|
||||
def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
|
||||
return None
|
||||
|
||||
pipeline_context.graph.get_tags = fake_get_tags # type: ignore
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=DatasetTagDomainMapper,
|
||||
aspect=models.DomainsClass(domains=[acryl_domain]),
|
||||
config={"domain_mapping": {"test:tag_1": server_domain}},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert isinstance(output[0].record.aspect, models.DomainsClass)
|
||||
assert len(output[0].record.aspect.domains) == 1
|
||||
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
|
||||
assert len(transformed_aspect.domains) == 1
|
||||
assert acryl_domain in transformed_aspect.domains
|
||||
assert server_domain not in transformed_aspect.domains
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user