mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 23:57:03 +00:00
423 lines
13 KiB
Python
423 lines
13 KiB
Python
from typing import Any, Dict
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
|
|
from datahub.metadata.schema_classes import (
|
|
DomainsClass,
|
|
GlobalTagsClass,
|
|
GlossaryTermsClass,
|
|
InstitutionalMemoryClass,
|
|
OwnerClass,
|
|
OwnershipClass,
|
|
OwnershipSourceTypeClass,
|
|
OwnershipTypeClass,
|
|
)
|
|
from datahub.utilities.mapping import OperationProcessor
|
|
|
|
|
|
def get_operation_defs() -> Dict[str, Any]:
|
|
return {
|
|
"user_owner": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "user"},
|
|
},
|
|
"user_owner_2": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "user"},
|
|
},
|
|
"multi_user": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "user"},
|
|
},
|
|
"group.owner": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "group"},
|
|
},
|
|
"pii": {
|
|
"match": True,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "has_pii_test"},
|
|
},
|
|
"int_property": {
|
|
"match": 1,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "int_property"},
|
|
},
|
|
"double_property": {
|
|
"match": 2.5,
|
|
"operation": "add_term",
|
|
"config": {"term": "double_property"},
|
|
},
|
|
"governance.team_owner": {
|
|
"match": "Finan.*",
|
|
"operation": "add_term",
|
|
"config": {"term": "Finance.test"},
|
|
},
|
|
"tag": {
|
|
"match": ".*",
|
|
"operation": "add_tag",
|
|
"config": {"tag": "{{ $match }}"},
|
|
},
|
|
}
|
|
|
|
|
|
def test_operation_processor_not_matching():
|
|
# no property matches to the rules
|
|
raw_props = {
|
|
"user_owner_test": "test_user@abc.com",
|
|
"group.owner_test": "test.group@abc.co.in",
|
|
"governance.team_owner": "Binance",
|
|
"pii": False,
|
|
"int_property": 3,
|
|
"double_property": 25,
|
|
}
|
|
processor = OperationProcessor(get_operation_defs())
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_tag" not in aspect_map
|
|
assert "add_term" not in aspect_map
|
|
assert "add_owner" not in aspect_map
|
|
|
|
|
|
def test_operation_processor_matching():
|
|
raw_props = {
|
|
"user_owner": "test_user@abc.com",
|
|
"multi_user": "sales_member1@abc.com, sales_member2@abc.com",
|
|
"user_owner_2": "test_user_2",
|
|
"group.owner": "test.group@abc.co.in",
|
|
"governance.team_owner": "Finance",
|
|
"pii": True,
|
|
"int_property": 1,
|
|
"double_property": 2.5,
|
|
"tag": "Finance",
|
|
}
|
|
|
|
processor = OperationProcessor(
|
|
operation_defs=get_operation_defs(),
|
|
owner_source_type="SOURCE_CONTROL",
|
|
strip_owner_email_id=True,
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_tag" in aspect_map
|
|
assert "add_term" in aspect_map
|
|
assert "add_owner" in aspect_map
|
|
tag_aspect: GlobalTags = aspect_map["add_tag"]
|
|
tags_added = [
|
|
tag_association_class.tag for tag_association_class in tag_aspect.tags
|
|
]
|
|
term_aspect: GlossaryTermsClass = aspect_map["add_term"]
|
|
terms_added = [
|
|
term_association_class.urn for term_association_class in term_aspect.terms
|
|
]
|
|
assert (
|
|
len(tags_added) == 3
|
|
and "urn:li:tag:has_pii_test" in tags_added
|
|
and "urn:li:tag:int_property" in tags_added
|
|
and "urn:li:tag:Finance" in tags_added
|
|
)
|
|
assert (
|
|
len(terms_added) == 2
|
|
and "urn:li:glossaryTerm:Finance.test" in terms_added
|
|
and "urn:li:glossaryTerm:double_property" in terms_added
|
|
)
|
|
|
|
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
|
|
assert len(ownership_aspect.owners) == 5
|
|
owner_set = {
|
|
"urn:li:corpuser:test_user",
|
|
"urn:li:corpuser:test_user_2",
|
|
"urn:li:corpGroup:test.group",
|
|
"urn:li:corpuser:sales_member1",
|
|
"urn:li:corpuser:sales_member2",
|
|
}
|
|
for single_owner in ownership_aspect.owners:
|
|
assert single_owner.owner in owner_set
|
|
assert (
|
|
single_owner.source
|
|
and single_owner.source.type == OwnershipSourceTypeClass.SOURCE_CONTROL
|
|
)
|
|
|
|
|
|
def test_operation_processor_no_email_strip_source_type_not_null():
|
|
raw_props = {
|
|
"user_owner": "test_user@abc.com",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs=get_operation_defs(),
|
|
owner_source_type="SERVICE",
|
|
strip_owner_email_id=False,
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_owner" in aspect_map
|
|
|
|
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
|
|
assert len(ownership_aspect.owners) == 1
|
|
new_owner: OwnerClass = ownership_aspect.owners[0]
|
|
assert new_owner.owner == "urn:li:corpuser:test_user@abc.com"
|
|
assert new_owner.source and new_owner.source.type == "SERVICE"
|
|
|
|
|
|
def test_operation_processor_advanced_matching_owners():
|
|
raw_props = {
|
|
"user_owner": "@test_user@abc.com",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"user_owner": {
|
|
"match": "^@(.*)",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "group"},
|
|
},
|
|
},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_owner" in aspect_map
|
|
|
|
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
|
|
assert len(ownership_aspect.owners) == 1
|
|
new_owner: OwnerClass = ownership_aspect.owners[0]
|
|
assert new_owner.owner == "urn:li:corpGroup:test_user@abc.com"
|
|
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
|
|
|
|
|
|
def test_operation_processor_ownership_category():
|
|
raw_props = {
|
|
"user_owner": "@test_user",
|
|
"business_owner": "alice,urn:li:corpGroup:biz-data-team",
|
|
"architect": "bob",
|
|
"producer": "urn:li:corpGroup:producer-group",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"user_owner": {
|
|
"match": "^@(.*)",
|
|
"operation": "add_owner",
|
|
"config": {
|
|
"owner_type": "group",
|
|
"owner_category": OwnershipTypeClass.DATA_STEWARD,
|
|
},
|
|
},
|
|
"business_owner": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {
|
|
"owner_type": "user",
|
|
"owner_category": OwnershipTypeClass.BUSINESS_OWNER,
|
|
},
|
|
},
|
|
"architect": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {
|
|
"owner_type": "user",
|
|
"owner_category": "urn:li:ownershipType:architect",
|
|
},
|
|
},
|
|
"producer": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {
|
|
# Testing using full urns without any owner_type set.
|
|
"owner_category": OwnershipTypeClass.PRODUCER,
|
|
},
|
|
},
|
|
},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_owner" in aspect_map
|
|
|
|
ownership_aspect: OwnershipClass = aspect_map["add_owner"]
|
|
assert len(ownership_aspect.owners) == 5
|
|
assert all(
|
|
new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
|
|
for new_owner in ownership_aspect.owners
|
|
)
|
|
|
|
new_owner: OwnerClass = ownership_aspect.owners[0]
|
|
assert new_owner.owner == "urn:li:corpGroup:biz-data-team"
|
|
assert new_owner.type and new_owner.type == OwnershipTypeClass.BUSINESS_OWNER
|
|
|
|
new_owner = ownership_aspect.owners[1]
|
|
assert new_owner.owner == "urn:li:corpGroup:producer-group"
|
|
assert new_owner.type and new_owner.type == OwnershipTypeClass.PRODUCER
|
|
|
|
new_owner = ownership_aspect.owners[2]
|
|
assert new_owner.owner == "urn:li:corpGroup:test_user"
|
|
assert new_owner.type and new_owner.type == OwnershipTypeClass.DATA_STEWARD
|
|
|
|
new_owner = ownership_aspect.owners[3]
|
|
assert new_owner.owner == "urn:li:corpuser:alice"
|
|
assert new_owner.type and new_owner.type == OwnershipTypeClass.BUSINESS_OWNER
|
|
|
|
new_owner = ownership_aspect.owners[4]
|
|
assert new_owner.owner == "urn:li:corpuser:bob"
|
|
assert new_owner.type == OwnershipTypeClass.CUSTOM
|
|
assert new_owner.typeUrn == "urn:li:ownershipType:architect"
|
|
|
|
|
|
def test_operation_processor_advanced_matching_tags():
|
|
raw_props = {
|
|
"case": "PLT-4567",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"case": {
|
|
"match": "^PLT-(.*)",
|
|
"operation": "add_tag",
|
|
"config": {"tag": "case_{{ $match }}"},
|
|
},
|
|
},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_tag" in aspect_map
|
|
|
|
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
|
|
assert len(tag_aspect.tags) == 1
|
|
assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567"
|
|
|
|
|
|
def test_operation_processor_institutional_memory():
|
|
raw_props = {
|
|
"documentation_link": "https://test.com/documentation#ignore-this",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"documentation_link": {
|
|
"match": r"(?:https?)?\:\/\/\w*[^#]*",
|
|
"operation": "add_doc_link",
|
|
"config": {"link": "{{ $match }}", "description": "test"},
|
|
},
|
|
},
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_doc_link" in aspect_map
|
|
|
|
doc_link_aspect: InstitutionalMemoryClass = aspect_map["add_doc_link"]
|
|
|
|
assert doc_link_aspect.elements[0].url == "https://test.com/documentation"
|
|
assert doc_link_aspect.elements[0].description == "test"
|
|
|
|
|
|
def test_operation_processor_institutional_memory_no_description():
|
|
raw_props = {
|
|
"documentation_link": "test.com/documentation#ignore-this",
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"documentation_link": {
|
|
"match": r"(?:https?)?\:\/\/\w*[^#]*",
|
|
"operation": "add_doc_link",
|
|
"config": {"link": "{{ $match }}"},
|
|
},
|
|
},
|
|
)
|
|
# we require a description, so this should stay empty
|
|
aspect_map = processor.process(raw_props)
|
|
assert aspect_map == {}
|
|
|
|
|
|
def test_operation_processor_matching_nested_props():
|
|
raw_props = {
|
|
"gdpr": {
|
|
"pii": True,
|
|
},
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"gdpr.pii": {
|
|
"match": True,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "pii"},
|
|
},
|
|
},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
match_nested_props=True,
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_tag" in aspect_map
|
|
|
|
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
|
|
assert len(tag_aspect.tags) == 1
|
|
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
|
|
|
|
|
|
def test_operation_processor_matching_dot_props():
|
|
raw_props = {
|
|
"gdpr.pii": True,
|
|
}
|
|
processor = OperationProcessor(
|
|
operation_defs={
|
|
"gdpr.pii": {
|
|
"match": True,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "pii"},
|
|
},
|
|
},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
match_nested_props=True,
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
assert "add_tag" in aspect_map
|
|
|
|
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
|
|
assert len(tag_aspect.tags) == 1
|
|
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
|
|
|
|
|
|
def test_operation_processor_datahub_props():
|
|
raw_props = {
|
|
"datahub": {
|
|
"tags": ["tag1", "tag2"],
|
|
"terms": ["term1", "term2"],
|
|
"owners": [
|
|
"owner1",
|
|
"urn:li:corpGroup:group1",
|
|
{
|
|
"owner": "owner2",
|
|
"owner_type": "urn:li:ownershipType:steward",
|
|
},
|
|
{
|
|
"owner": "urn:li:corpGroup:group2",
|
|
"owner_type": "urn:li:ownershipType:steward",
|
|
},
|
|
],
|
|
"domain": "domain1",
|
|
}
|
|
}
|
|
|
|
processor = OperationProcessor(
|
|
operation_defs={},
|
|
owner_source_type="SOURCE_CONTROL",
|
|
)
|
|
aspect_map = processor.process(raw_props)
|
|
|
|
assert isinstance(aspect_map["add_owner"], OwnershipClass)
|
|
assert [
|
|
(owner.owner, owner.type, owner.typeUrn)
|
|
for owner in aspect_map["add_owner"].owners
|
|
] == [
|
|
("urn:li:corpGroup:group1", "DATAOWNER", None),
|
|
("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"),
|
|
("urn:li:corpuser:owner1", "DATAOWNER", None),
|
|
("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"),
|
|
]
|
|
|
|
assert isinstance(aspect_map["add_tag"], GlobalTagsClass)
|
|
assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [
|
|
"urn:li:tag:tag1",
|
|
"urn:li:tag:tag2",
|
|
]
|
|
|
|
assert isinstance(aspect_map["add_term"], GlossaryTermsClass)
|
|
assert [
|
|
term_association.urn for term_association in aspect_map["add_term"].terms
|
|
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]
|
|
|
|
assert isinstance(aspect_map["add_domain"], DomainsClass)
|
|
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]
|