mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 02:17:53 +00:00
fix(ingestion/transformer): replace the externalUrl container (#11013)
This commit is contained in:
parent
a6eb1f47c2
commit
da72ba2113
@ -953,7 +953,7 @@ Then define your class to return a list of custom properties, for example:
|
||||
add_properties_resolver_class: "<your_module>.<your_class>"
|
||||
```
|
||||
|
||||
## Replace ExternalUrl
|
||||
## Replace ExternalUrl Dataset
|
||||
### Config Details
|
||||
| Field | Required | Type | Default | Description |
|
||||
|-----------------------------|----------|---------|---------------|---------------------------------------------|
|
||||
@ -971,6 +971,24 @@ transformers:
|
||||
replacement: "sub"
|
||||
```
|
||||
|
||||
## Replace ExternalUrl Container
|
||||
### Config Details
|
||||
| Field | Required | Type | Default | Description |
|
||||
|-----------------------------|----------|---------|---------------|---------------------------------------------|
|
||||
| `input_pattern` | ✅ | string | | String or pattern to replace |
|
||||
| `replacement` | ✅ | string | | Replacement string |
|
||||
|
||||
|
||||
Matches the full/partial string in the externalUrl of the container properties and replace that with the replacement string
|
||||
|
||||
```yaml
|
||||
transformers:
|
||||
- type: "replace_external_url_container"
|
||||
config:
|
||||
input_pattern: '\b\w*hub\b'
|
||||
replacement: "sub"
|
||||
```
|
||||
|
||||
## Clean User URN in DatasetUsageStatistics Aspect
|
||||
### Config Details
|
||||
| Field | Required | Type | Default | Description |
|
||||
|
||||
@ -750,7 +750,8 @@ entry_points = {
|
||||
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
|
||||
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
|
||||
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
|
||||
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
|
||||
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlDataset",
|
||||
"replace_external_url_container = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlContainer",
|
||||
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
|
||||
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
|
||||
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
|
||||
|
||||
@ -37,6 +37,16 @@ class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta
|
||||
return ["dataset", "container"]
|
||||
|
||||
|
||||
class ContainerTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
|
||||
"""Transformer that does transform sequentially on each dataset."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def entity_types(self) -> List[str]:
|
||||
return ["container"]
|
||||
|
||||
|
||||
class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
|
||||
def aspect_name(self) -> str:
|
||||
return "ownership"
|
||||
@ -143,3 +153,8 @@ class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
|
||||
class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
|
||||
def aspect_name(self) -> str:
|
||||
return "glossaryTerms"
|
||||
|
||||
|
||||
class ContainerPropertiesTransformer(ContainerTransformer, metaclass=ABCMeta):
|
||||
def aspect_name(self) -> str:
|
||||
return "containerProperties"
|
||||
|
||||
@ -6,9 +6,13 @@ from datahub.configuration.common import ConfigModel
|
||||
from datahub.emitter.mce_builder import Aspect
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.transformer.dataset_transformer import (
|
||||
ContainerPropertiesTransformer,
|
||||
DatasetPropertiesTransformer,
|
||||
)
|
||||
from datahub.metadata.schema_classes import DatasetPropertiesClass
|
||||
from datahub.metadata.schema_classes import (
|
||||
ContainerPropertiesClass,
|
||||
DatasetPropertiesClass,
|
||||
)
|
||||
|
||||
|
||||
class ReplaceExternalUrlConfig(ConfigModel):
|
||||
@ -16,8 +20,14 @@ class ReplaceExternalUrlConfig(ConfigModel):
|
||||
replacement: str
|
||||
|
||||
|
||||
class ReplaceExternalUrl(DatasetPropertiesTransformer):
|
||||
"""Transformer that clean the ownership URN."""
|
||||
class ReplaceUrl:
|
||||
def replace_url(self, pattern: str, replacement: str, external_url: str) -> str:
|
||||
pattern_obj = re.compile(pattern)
|
||||
return re.sub(pattern_obj, replacement, external_url)
|
||||
|
||||
|
||||
class ReplaceExternalUrlDataset(DatasetPropertiesTransformer, ReplaceUrl):
|
||||
"""Transformer that replace the external URL for dataset properties."""
|
||||
|
||||
ctx: PipelineContext
|
||||
config: ReplaceExternalUrlConfig
|
||||
@ -34,7 +44,9 @@ class ReplaceExternalUrl(DatasetPropertiesTransformer):
|
||||
self.resolver_args = resolver_args
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl":
|
||||
def create(
|
||||
cls, config_dict: dict, ctx: PipelineContext
|
||||
) -> "ReplaceExternalUrlDataset":
|
||||
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
@ -55,11 +67,60 @@ class ReplaceExternalUrl(DatasetPropertiesTransformer):
|
||||
in_dataset_properties_aspect
|
||||
)
|
||||
|
||||
pattern = re.compile(self.config.input_pattern)
|
||||
replacement = self.config.replacement
|
||||
|
||||
out_dataset_properties_aspect.externalUrl = re.sub(
|
||||
pattern, replacement, in_dataset_properties_aspect.externalUrl
|
||||
out_dataset_properties_aspect.externalUrl = self.replace_url(
|
||||
self.config.input_pattern,
|
||||
self.config.replacement,
|
||||
in_dataset_properties_aspect.externalUrl,
|
||||
)
|
||||
|
||||
return cast(Aspect, out_dataset_properties_aspect)
|
||||
|
||||
|
||||
class ReplaceExternalUrlContainer(ContainerPropertiesTransformer, ReplaceUrl):
|
||||
"""Transformer that replace the external URL for container properties."""
|
||||
|
||||
ctx: PipelineContext
|
||||
config: ReplaceExternalUrlConfig
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: ReplaceExternalUrlConfig,
|
||||
ctx: PipelineContext,
|
||||
**resolver_args: Dict[str, Any],
|
||||
):
|
||||
super().__init__()
|
||||
self.ctx = ctx
|
||||
self.config = config
|
||||
self.resolver_args = resolver_args
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict: dict, ctx: PipelineContext
|
||||
) -> "ReplaceExternalUrlContainer":
|
||||
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
def transform_aspect(
|
||||
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
|
||||
) -> Optional[Aspect]:
|
||||
|
||||
in_container_properties_aspect: ContainerPropertiesClass = cast(
|
||||
ContainerPropertiesClass, aspect
|
||||
)
|
||||
if (
|
||||
not hasattr(in_container_properties_aspect, "externalUrl")
|
||||
or not in_container_properties_aspect.externalUrl
|
||||
):
|
||||
return cast(Aspect, in_container_properties_aspect)
|
||||
else:
|
||||
out_container_properties_aspect: ContainerPropertiesClass = copy.deepcopy(
|
||||
in_container_properties_aspect
|
||||
)
|
||||
|
||||
out_container_properties_aspect.externalUrl = self.replace_url(
|
||||
self.config.input_pattern,
|
||||
self.config.replacement,
|
||||
in_container_properties_aspect.externalUrl,
|
||||
)
|
||||
|
||||
return cast(Aspect, out_container_properties_aspect)
|
||||
|
||||
@ -71,6 +71,7 @@ from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
|
||||
DatasetTagDomainMapper,
|
||||
)
|
||||
from datahub.ingestion.transformer.dataset_transformer import (
|
||||
ContainerTransformer,
|
||||
DatasetTransformer,
|
||||
TagTransformer,
|
||||
)
|
||||
@ -88,7 +89,10 @@ from datahub.ingestion.transformer.pattern_cleanup_ownership import (
|
||||
from datahub.ingestion.transformer.remove_dataset_ownership import (
|
||||
SimpleRemoveDatasetOwnership,
|
||||
)
|
||||
from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl
|
||||
from datahub.ingestion.transformer.replace_external_url import (
|
||||
ReplaceExternalUrlContainer,
|
||||
ReplaceExternalUrlDataset,
|
||||
)
|
||||
from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper
|
||||
from datahub.metadata.schema_classes import (
|
||||
BrowsePathsClass,
|
||||
@ -134,6 +138,22 @@ def make_generic_dataset_mcp(
|
||||
)
|
||||
|
||||
|
||||
def make_generic_container_mcp(
|
||||
entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee",
|
||||
aspect_name: str = "status",
|
||||
aspect: Any = None,
|
||||
) -> MetadataChangeProposalWrapper:
|
||||
if aspect is None:
|
||||
aspect = models.StatusClass(removed=False)
|
||||
return MetadataChangeProposalWrapper(
|
||||
entityUrn=entity_urn,
|
||||
entityType=Urn.create_from_string(entity_urn).get_type(),
|
||||
aspectName=aspect_name,
|
||||
changeType="UPSERT",
|
||||
aspect=aspect,
|
||||
)
|
||||
|
||||
|
||||
def create_and_run_test_pipeline(
|
||||
events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]],
|
||||
transformers: List[Dict[str, Any]],
|
||||
@ -1929,6 +1949,41 @@ def run_dataset_transformer_pipeline(
|
||||
return outputs
|
||||
|
||||
|
||||
def run_container_transformer_pipeline(
|
||||
transformer_type: Type[ContainerTransformer],
|
||||
aspect: Optional[builder.Aspect],
|
||||
config: dict,
|
||||
pipeline_context: Optional[PipelineContext] = None,
|
||||
use_mce: bool = False,
|
||||
) -> List[RecordEnvelope]:
|
||||
if pipeline_context is None:
|
||||
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
|
||||
transformer: ContainerTransformer = cast(
|
||||
ContainerTransformer, transformer_type.create(config, pipeline_context)
|
||||
)
|
||||
|
||||
container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
|
||||
if use_mce:
|
||||
container = MetadataChangeEventClass(
|
||||
proposedSnapshot=models.DatasetSnapshotClass(
|
||||
urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde",
|
||||
aspects=[],
|
||||
)
|
||||
)
|
||||
else:
|
||||
assert aspect
|
||||
container = make_generic_container_mcp(
|
||||
aspect=aspect, aspect_name=transformer.aspect_name()
|
||||
)
|
||||
|
||||
outputs = list(
|
||||
transformer.transform(
|
||||
[RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]]
|
||||
)
|
||||
)
|
||||
return outputs
|
||||
|
||||
|
||||
def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph):
|
||||
pipeline_context: PipelineContext = PipelineContext(
|
||||
run_id="test_simple_add_dataset_domain"
|
||||
@ -3235,7 +3290,7 @@ def test_replace_external_url_word_replace(
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrl,
|
||||
transformer_type=ReplaceExternalUrlDataset,
|
||||
aspect=models.DatasetPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
@ -3262,7 +3317,7 @@ def test_replace_external_regex_replace_1(
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrl,
|
||||
transformer_type=ReplaceExternalUrlDataset,
|
||||
aspect=models.DatasetPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
@ -3289,7 +3344,7 @@ def test_replace_external_regex_replace_2(
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_dataset_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrl,
|
||||
transformer_type=ReplaceExternalUrlDataset,
|
||||
aspect=models.DatasetPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
@ -3867,3 +3922,87 @@ def test_tags_to_terms_with_partial_match(mock_datahub_graph):
|
||||
assert isinstance(terms_aspect, models.GlossaryTermsClass)
|
||||
assert len(terms_aspect.terms) == 1
|
||||
assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1"
|
||||
|
||||
|
||||
def test_replace_external_url_container_word_replace(
|
||||
mock_datahub_graph,
|
||||
):
|
||||
pipeline_context: PipelineContext = PipelineContext(
|
||||
run_id="test_replace_external_url_container"
|
||||
)
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_container_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrlContainer,
|
||||
aspect=models.ContainerPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
name="sample_test",
|
||||
),
|
||||
config={"input_pattern": "datahub", "replacement": "starhub"},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert output[0].record
|
||||
assert output[0].record.aspect
|
||||
assert (
|
||||
output[0].record.aspect.externalUrl
|
||||
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
|
||||
)
|
||||
|
||||
|
||||
def test_replace_external_regex_container_replace_1(
|
||||
mock_datahub_graph,
|
||||
):
|
||||
pipeline_context: PipelineContext = PipelineContext(
|
||||
run_id="test_replace_external_url_container"
|
||||
)
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_container_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrlContainer,
|
||||
aspect=models.ContainerPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
name="sample_test",
|
||||
),
|
||||
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert output[0].record
|
||||
assert output[0].record.aspect
|
||||
assert (
|
||||
output[0].record.aspect.externalUrl
|
||||
== "https://github.com/starhub/test/foo.view.lkml"
|
||||
)
|
||||
|
||||
|
||||
def test_replace_external_regex_container_replace_2(
|
||||
mock_datahub_graph,
|
||||
):
|
||||
pipeline_context: PipelineContext = PipelineContext(
|
||||
run_id="test_replace_external_url_container"
|
||||
)
|
||||
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)
|
||||
|
||||
output = run_container_transformer_pipeline(
|
||||
transformer_type=ReplaceExternalUrlContainer,
|
||||
aspect=models.ContainerPropertiesClass(
|
||||
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
|
||||
customProperties=EXISTING_PROPERTIES.copy(),
|
||||
name="sample_test",
|
||||
),
|
||||
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
|
||||
pipeline_context=pipeline_context,
|
||||
)
|
||||
|
||||
assert len(output) == 2
|
||||
assert output[0].record
|
||||
assert output[0].record.aspect
|
||||
assert (
|
||||
output[0].record.aspect.externalUrl
|
||||
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user