fix(ingest/transformer): generate ownership aspect from handle_end_of_stream (#9720)

This commit is contained in:
sid-acryl 2024-02-01 03:55:08 +05:30 committed by GitHub
parent f3afdf9e0c
commit 874c683f2a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 59 additions and 18 deletions

View File

@ -20,8 +20,8 @@ log = logging.getLogger(__name__)
def _update_work_unit_id(
envelope: RecordEnvelope, urn: str, aspect_name: str
) -> Dict[Any, Any]:
structured_urn = Urn.create_from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
structured_urn = Urn.from_string(urn)
simple_name = "-".join(structured_urn.entity_ids)
record_metadata = envelope.metadata.copy()
record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"})
return record_metadata

View File

@ -1,11 +1,14 @@
import logging
import re
from functools import lru_cache
from typing import List, Optional, cast
from typing import List, Optional, Sequence, Union, cast
from datahub.configuration.common import TransformerSemanticsConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata._schema_classes import MetadataChangeProposalClass
from datahub.metadata.schema_classes import (
GlobalTagsClass,
OwnerClass,
@ -16,6 +19,8 @@ from datahub.utilities.urns.corp_group_urn import CorpGroupUrn
from datahub.utilities.urns.corpuser_urn import CorpuserUrn
from datahub.utilities.urns.tag_urn import TagUrn
logger = logging.getLogger(__name__)
class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel):
tag_prefix: str
@ -38,11 +43,13 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):
ctx: PipelineContext
config: ExtractOwnersFromTagsConfig
owner_mcps: List[MetadataChangeProposalWrapper]
def __init__(self, config: ExtractOwnersFromTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
self.owner_mcps = []
@classmethod
def create(
@ -56,6 +63,12 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):
return owner_str + "@" + self.config.email_domain
return owner_str
def handle_end_of_stream(
self,
) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
return self.owner_mcps
def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
@ -64,28 +77,39 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):
return None
tags = in_tags_aspect.tags
owners: List[OwnerClass] = []
for tag_class in tags:
tag_urn = TagUrn.from_string(tag_class.tag)
tag_str = tag_urn.get_entity_id()[0]
tag_str = tag_urn.entity_ids[0]
re_match = re.search(self.config.tag_prefix, tag_str)
if re_match:
owner_str = tag_str[re_match.end() :].strip()
owner_urn_str = self.get_owner_urn(owner_str)
if self.config.is_user:
owner_urn = str(CorpuserUrn.create_from_id(owner_urn_str))
owner_urn = str(CorpuserUrn(owner_urn_str))
else:
owner_urn = str(CorpGroupUrn.create_from_id(owner_urn_str))
owner_urn = str(CorpGroupUrn(owner_urn_str))
owner_type = get_owner_type(self.config.owner_type)
if owner_type == OwnershipTypeClass.CUSTOM:
assert (
self.config.owner_type_urn is not None
), "owner_type_urn must be set if owner_type is CUSTOM"
owner = OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,
)
owners.append(owner)
owner_aspect = OwnershipClass(owners=owners)
return cast(Aspect, owner_aspect)
owners.append(
OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,
)
)
self.owner_mcps.append(
MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=OwnershipClass(
owners=owners,
),
)
)
return None

View File

@ -648,22 +648,35 @@ def test_extract_owners_from_tags():
)
]
)
transformer = ExtractOwnersFromTagsTransformer.create(
config,
PipelineContext(run_id="test"),
)
transformed = list(
record_envelops: List[RecordEnvelope] = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
RecordEnvelope(record=EndOfStream(), metadata={}),
]
)
)
owners_aspect = transformed[0].record.proposedSnapshot.aspects[0]
assert len(record_envelops) == 3
mcp: MetadataChangeProposalWrapper = record_envelops[1].record
owners_aspect = cast(OwnershipClass, mcp.aspect)
owners = owners_aspect.owners
owner = owners[0]
if expected_owner_type is not None:
assert owner.type == expected_owner_type
assert expected_owner_type is not None
assert owner.type == expected_owner_type
assert owner.owner == expected_owner
_test_owner(
@ -672,6 +685,7 @@ def test_extract_owners_from_tags():
"tag_prefix": "owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="abcdef-owner:foo",
@ -679,6 +693,7 @@ def test_extract_owners_from_tags():
"tag_prefix": ".*owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
@ -687,6 +702,7 @@ def test_extract_owners_from_tags():
"is_user": False,
},
expected_owner="urn:li:corpGroup:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
@ -695,6 +711,7 @@ def test_extract_owners_from_tags():
"email_domain": "example.com",
},
expected_owner="urn:li:corpuser:foo@example.com",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",