Harshal Sheth 28310bb64b
feat(ingest): support full urns without owner_type in meta mapping (#11298)
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
2024-09-09 11:18:07 -07:00

423 lines
13 KiB
Python

from typing import Any, Dict
from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import (
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
InstitutionalMemoryClass,
OwnerClass,
OwnershipClass,
OwnershipSourceTypeClass,
OwnershipTypeClass,
)
from datahub.utilities.mapping import OperationProcessor
def get_operation_defs() -> Dict[str, Any]:
return {
"user_owner": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"user_owner_2": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"multi_user": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"group.owner": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "group"},
},
"pii": {
"match": True,
"operation": "add_tag",
"config": {"tag": "has_pii_test"},
},
"int_property": {
"match": 1,
"operation": "add_tag",
"config": {"tag": "int_property"},
},
"double_property": {
"match": 2.5,
"operation": "add_term",
"config": {"term": "double_property"},
},
"governance.team_owner": {
"match": "Finan.*",
"operation": "add_term",
"config": {"term": "Finance.test"},
},
"tag": {
"match": ".*",
"operation": "add_tag",
"config": {"tag": "{{ $match }}"},
},
}
def test_operation_processor_not_matching():
# no property matches to the rules
raw_props = {
"user_owner_test": "test_user@abc.com",
"group.owner_test": "test.group@abc.co.in",
"governance.team_owner": "Binance",
"pii": False,
"int_property": 3,
"double_property": 25,
}
processor = OperationProcessor(get_operation_defs())
aspect_map = processor.process(raw_props)
assert "add_tag" not in aspect_map
assert "add_term" not in aspect_map
assert "add_owner" not in aspect_map
def test_operation_processor_matching():
raw_props = {
"user_owner": "test_user@abc.com",
"multi_user": "sales_member1@abc.com, sales_member2@abc.com",
"user_owner_2": "test_user_2",
"group.owner": "test.group@abc.co.in",
"governance.team_owner": "Finance",
"pii": True,
"int_property": 1,
"double_property": 2.5,
"tag": "Finance",
}
processor = OperationProcessor(
operation_defs=get_operation_defs(),
owner_source_type="SOURCE_CONTROL",
strip_owner_email_id=True,
)
aspect_map = processor.process(raw_props)
assert "add_tag" in aspect_map
assert "add_term" in aspect_map
assert "add_owner" in aspect_map
tag_aspect: GlobalTags = aspect_map["add_tag"]
tags_added = [
tag_association_class.tag for tag_association_class in tag_aspect.tags
]
term_aspect: GlossaryTermsClass = aspect_map["add_term"]
terms_added = [
term_association_class.urn for term_association_class in term_aspect.terms
]
assert (
len(tags_added) == 3
and "urn:li:tag:has_pii_test" in tags_added
and "urn:li:tag:int_property" in tags_added
and "urn:li:tag:Finance" in tags_added
)
assert (
len(terms_added) == 2
and "urn:li:glossaryTerm:Finance.test" in terms_added
and "urn:li:glossaryTerm:double_property" in terms_added
)
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 5
owner_set = {
"urn:li:corpuser:test_user",
"urn:li:corpuser:test_user_2",
"urn:li:corpGroup:test.group",
"urn:li:corpuser:sales_member1",
"urn:li:corpuser:sales_member2",
}
for single_owner in ownership_aspect.owners:
assert single_owner.owner in owner_set
assert (
single_owner.source
and single_owner.source.type == OwnershipSourceTypeClass.SOURCE_CONTROL
)
def test_operation_processor_no_email_strip_source_type_not_null():
raw_props = {
"user_owner": "test_user@abc.com",
}
processor = OperationProcessor(
operation_defs=get_operation_defs(),
owner_source_type="SERVICE",
strip_owner_email_id=False,
)
aspect_map = processor.process(raw_props)
assert "add_owner" in aspect_map
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 1
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpuser:test_user@abc.com"
assert new_owner.source and new_owner.source.type == "SERVICE"
def test_operation_processor_advanced_matching_owners():
raw_props = {
"user_owner": "@test_user@abc.com",
}
processor = OperationProcessor(
operation_defs={
"user_owner": {
"match": "^@(.*)",
"operation": "add_owner",
"config": {"owner_type": "group"},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_owner" in aspect_map
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 1
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpGroup:test_user@abc.com"
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
def test_operation_processor_ownership_category():
raw_props = {
"user_owner": "@test_user",
"business_owner": "alice,urn:li:corpGroup:biz-data-team",
"architect": "bob",
"producer": "urn:li:corpGroup:producer-group",
}
processor = OperationProcessor(
operation_defs={
"user_owner": {
"match": "^@(.*)",
"operation": "add_owner",
"config": {
"owner_type": "group",
"owner_category": OwnershipTypeClass.DATA_STEWARD,
},
},
"business_owner": {
"match": ".*",
"operation": "add_owner",
"config": {
"owner_type": "user",
"owner_category": OwnershipTypeClass.BUSINESS_OWNER,
},
},
"architect": {
"match": ".*",
"operation": "add_owner",
"config": {
"owner_type": "user",
"owner_category": "urn:li:ownershipType:architect",
},
},
"producer": {
"match": ".*",
"operation": "add_owner",
"config": {
# Testing using full urns without any owner_type set.
"owner_category": OwnershipTypeClass.PRODUCER,
},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_owner" in aspect_map
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 5
assert all(
new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
for new_owner in ownership_aspect.owners
)
new_owner: OwnerClass = ownership_aspect.owners[0]
assert new_owner.owner == "urn:li:corpGroup:biz-data-team"
assert new_owner.type and new_owner.type == OwnershipTypeClass.BUSINESS_OWNER
new_owner = ownership_aspect.owners[1]
assert new_owner.owner == "urn:li:corpGroup:producer-group"
assert new_owner.type and new_owner.type == OwnershipTypeClass.PRODUCER
new_owner = ownership_aspect.owners[2]
assert new_owner.owner == "urn:li:corpGroup:test_user"
assert new_owner.type and new_owner.type == OwnershipTypeClass.DATA_STEWARD
new_owner = ownership_aspect.owners[3]
assert new_owner.owner == "urn:li:corpuser:alice"
assert new_owner.type and new_owner.type == OwnershipTypeClass.BUSINESS_OWNER
new_owner = ownership_aspect.owners[4]
assert new_owner.owner == "urn:li:corpuser:bob"
assert new_owner.type == OwnershipTypeClass.CUSTOM
assert new_owner.typeUrn == "urn:li:ownershipType:architect"
def test_operation_processor_advanced_matching_tags():
raw_props = {
"case": "PLT-4567",
}
processor = OperationProcessor(
operation_defs={
"case": {
"match": "^PLT-(.*)",
"operation": "add_tag",
"config": {"tag": "case_{{ $match }}"},
},
},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert "add_tag" in aspect_map
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567"
def test_operation_processor_institutional_memory():
raw_props = {
"documentation_link": "https://test.com/documentation#ignore-this",
}
processor = OperationProcessor(
operation_defs={
"documentation_link": {
"match": r"(?:https?)?\:\/\/\w*[^#]*",
"operation": "add_doc_link",
"config": {"link": "{{ $match }}", "description": "test"},
},
},
)
aspect_map = processor.process(raw_props)
assert "add_doc_link" in aspect_map
doc_link_aspect: InstitutionalMemoryClass = aspect_map["add_doc_link"]
assert doc_link_aspect.elements[0].url == "https://test.com/documentation"
assert doc_link_aspect.elements[0].description == "test"
def test_operation_processor_institutional_memory_no_description():
raw_props = {
"documentation_link": "test.com/documentation#ignore-this",
}
processor = OperationProcessor(
operation_defs={
"documentation_link": {
"match": r"(?:https?)?\:\/\/\w*[^#]*",
"operation": "add_doc_link",
"config": {"link": "{{ $match }}"},
},
},
)
# we require a description, so this should stay empty
aspect_map = processor.process(raw_props)
assert aspect_map == {}
def test_operation_processor_matching_nested_props():
raw_props = {
"gdpr": {
"pii": True,
},
}
processor = OperationProcessor(
operation_defs={
"gdpr.pii": {
"match": True,
"operation": "add_tag",
"config": {"tag": "pii"},
},
},
owner_source_type="SOURCE_CONTROL",
match_nested_props=True,
)
aspect_map = processor.process(raw_props)
assert "add_tag" in aspect_map
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
def test_operation_processor_matching_dot_props():
raw_props = {
"gdpr.pii": True,
}
processor = OperationProcessor(
operation_defs={
"gdpr.pii": {
"match": True,
"operation": "add_tag",
"config": {"tag": "pii"},
},
},
owner_source_type="SOURCE_CONTROL",
match_nested_props=True,
)
aspect_map = processor.process(raw_props)
assert "add_tag" in aspect_map
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
def test_operation_processor_datahub_props():
raw_props = {
"datahub": {
"tags": ["tag1", "tag2"],
"terms": ["term1", "term2"],
"owners": [
"owner1",
"urn:li:corpGroup:group1",
{
"owner": "owner2",
"owner_type": "urn:li:ownershipType:steward",
},
{
"owner": "urn:li:corpGroup:group2",
"owner_type": "urn:li:ownershipType:steward",
},
],
"domain": "domain1",
}
}
processor = OperationProcessor(
operation_defs={},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)
assert isinstance(aspect_map["add_owner"], OwnershipClass)
assert [
(owner.owner, owner.type, owner.typeUrn)
for owner in aspect_map["add_owner"].owners
] == [
("urn:li:corpGroup:group1", "DATAOWNER", None),
("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"),
("urn:li:corpuser:owner1", "DATAOWNER", None),
("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"),
]
assert isinstance(aspect_map["add_tag"], GlobalTagsClass)
assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [
"urn:li:tag:tag1",
"urn:li:tag:tag2",
]
assert isinstance(aspect_map["add_term"], GlossaryTermsClass)
assert [
term_association.urn for term_association in aspect_map["add_term"].terms
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]
assert isinstance(aspect_map["add_domain"], DomainsClass)
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]