feat(ingest): support domains in meta -> "datahub" section (#10967)

This commit is contained in:
Harshal Sheth 2024-07-25 09:31:19 -07:00 committed by GitHub
parent f4fb89e799
commit 1fa7998ed3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 76 additions and 18 deletions

View File

@ -113,6 +113,11 @@ classification_lib = {
"numpy<2", "numpy<2",
} }
dbt_common = {
*sqlglot_lib,
"more_itertools",
}
sql_common = ( sql_common = (
{ {
# Required for all SQL sources. # Required for all SQL sources.
@ -352,8 +357,8 @@ plugins: Dict[str, Set[str]] = {
"datahub-lineage-file": set(), "datahub-lineage-file": set(),
"datahub-business-glossary": set(), "datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake}, "delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | sqlglot_lib | aws_common, "dbt": {"requests"} | dbt_common | aws_common,
"dbt-cloud": {"requests"} | sqlglot_lib, "dbt-cloud": {"requests"} | dbt_common,
"druid": sql_common | {"pydruid>=0.6.2"}, "druid": sql_common | {"pydruid>=0.6.2"},
"dynamodb": aws_common | classification_lib, "dynamodb": aws_common | classification_lib,
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws # Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws

View File

@ -23,6 +23,7 @@ from typing import (
) )
import typing_inspect import typing_inspect
from avrogen.dict_wrapper import DictWrapper
from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
@ -412,9 +413,9 @@ def make_lineage_mce(
return mce return mce
def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool: def can_add_aspect_to_snapshot(
SnapshotType = type(mce.proposedSnapshot) SnapshotType: Type[DictWrapper], AspectType: Type[Aspect]
) -> bool:
constructor_annotations = get_type_hints(SnapshotType.__init__) constructor_annotations = get_type_hints(SnapshotType.__init__)
aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0] aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0]
@ -423,6 +424,12 @@ def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> b
return issubclass(AspectType, supported_aspect_types) return issubclass(AspectType, supported_aspect_types)
def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)
return can_add_aspect_to_snapshot(SnapshotType, AspectType)
def assert_can_add_aspect( def assert_can_add_aspect(
mce: MetadataChangeEventClass, AspectType: Type[Aspect] mce: MetadataChangeEventClass, AspectType: Type[Aspect]
) -> None: ) -> None:

View File

@ -7,6 +7,7 @@ from datetime import datetime
from enum import auto from enum import auto
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import more_itertools
import pydantic import pydantic
from pydantic import root_validator, validator from pydantic import root_validator, validator
from pydantic.fields import Field from pydantic.fields import Field
@ -1309,8 +1310,23 @@ class DBTSourceBase(StatefulIngestionSourceBase):
aspect=self._make_data_platform_instance_aspect(), aspect=self._make_data_platform_instance_aspect(),
).as_workunit() ).as_workunit()
standalone_aspects, snapshot_aspects = more_itertools.partition(
(
lambda aspect: mce_builder.can_add_aspect_to_snapshot(
DatasetSnapshot, type(aspect)
)
),
aspects,
)
for aspect in standalone_aspects:
# The domains aspect, and some others, may not support being added to the snapshot.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=aspect,
).as_workunit()
dataset_snapshot = DatasetSnapshot( dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects urn=node_datahub_urn, aspects=list(snapshot_aspects)
) )
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH": if self.config.write_semantics == "PATCH":
@ -1588,6 +1604,10 @@ class DBTSourceBase(StatefulIngestionSourceBase):
): ):
aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION)) aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION))
# add meta domains aspect
if meta_aspects.get(Constants.ADD_DOMAIN_OPERATION):
aspects.append(meta_aspects.get(Constants.ADD_DOMAIN_OPERATION))
# add meta links aspect # add meta links aspect
meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION) meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION)
if meta_links_aspect and self.config.enable_meta_mapping: if meta_links_aspect and self.config.enable_meta_mapping:

View File

@ -15,6 +15,7 @@ from datahub.emitter.mce_builder import (
) )
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
AuditStampClass, AuditStampClass,
DomainsClass,
InstitutionalMemoryClass, InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass, InstitutionalMemoryMetadataClass,
OwnerClass, OwnerClass,
@ -70,6 +71,8 @@ class Constants:
ADD_TERM_OPERATION = "add_term" ADD_TERM_OPERATION = "add_term"
ADD_TERMS_OPERATION = "add_terms" ADD_TERMS_OPERATION = "add_terms"
ADD_OWNER_OPERATION = "add_owner" ADD_OWNER_OPERATION = "add_owner"
ADD_DOMAIN_OPERATION = "add_domain"
OPERATION = "operation" OPERATION = "operation"
OPERATION_CONFIG = "config" OPERATION_CONFIG = "config"
TAG = "tag" TAG = "tag"
@ -94,9 +97,15 @@ class _MappingOwner(ConfigModel):
class _DatahubProps(ConfigModel): class _DatahubProps(ConfigModel):
owners: List[Union[str, _MappingOwner]] tags: Optional[List[str]] = None
terms: Optional[List[str]] = None
owners: Optional[List[Union[str, _MappingOwner]]] = None
domain: Optional[str] = None
def make_owner_category_list(self) -> List[Dict]: def make_owner_category_list(self) -> List[Dict]:
if self.owners is None:
return []
res = [] res = []
for owner in self.owners: for owner in self.owners:
if isinstance(owner, str): if isinstance(owner, str):
@ -176,26 +185,29 @@ class OperationProcessor:
# Process the special "datahub" property, which supports tags, terms, and owners. # Process the special "datahub" property, which supports tags, terms, and owners.
operations_map: Dict[str, list] = {} operations_map: Dict[str, list] = {}
try: try:
datahub_prop = raw_props.get("datahub") raw_datahub_prop = raw_props.get("datahub")
if datahub_prop and isinstance(datahub_prop, dict): if raw_datahub_prop:
if datahub_prop.get("tags"): datahub_prop = _DatahubProps.parse_obj_allow_extras(raw_datahub_prop)
if datahub_prop.tags:
# Note that tags get converted to urns later because we need to support the tag prefix. # Note that tags get converted to urns later because we need to support the tag prefix.
tags = datahub_prop["tags"]
operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend( operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend(
tags datahub_prop.tags
) )
if datahub_prop.get("terms"): if datahub_prop.terms:
terms = datahub_prop["terms"]
operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend( operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend(
mce_builder.make_term_urn(term) for term in terms mce_builder.make_term_urn(term) for term in datahub_prop.terms
) )
if datahub_prop.get("owners"): if datahub_prop.owners:
owners = _DatahubProps.parse_obj_allow_extras(datahub_prop)
operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend( operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend(
owners.make_owner_category_list() datahub_prop.make_owner_category_list()
) )
if datahub_prop.domain:
operations_map.setdefault(
Constants.ADD_DOMAIN_OPERATION, []
).append(mce_builder.make_domain_urn(datahub_prop.domain))
except Exception as e: except Exception as e:
logger.error(f"Error while processing datahub property: {e}") logger.error(f"Error while processing datahub property: {e}")
@ -299,6 +311,15 @@ class OperationProcessor:
) )
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect
if Constants.ADD_DOMAIN_OPERATION in operation_map:
domain_aspect = DomainsClass(
domains=[
mce_builder.make_domain_urn(domain)
for domain in operation_map[Constants.ADD_DOMAIN_OPERATION]
]
)
aspect_map[Constants.ADD_DOMAIN_OPERATION] = domain_aspect
if Constants.ADD_DOC_LINK_OPERATION in operation_map: if Constants.ADD_DOC_LINK_OPERATION in operation_map:
try: try:
if len( if len(

View File

@ -2,6 +2,7 @@ from typing import Any, Dict
from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DomainsClass,
GlobalTagsClass, GlobalTagsClass,
GlossaryTermsClass, GlossaryTermsClass,
InstitutionalMemoryClass, InstitutionalMemoryClass,
@ -366,6 +367,7 @@ def test_operation_processor_datahub_props():
"owner_type": "urn:li:ownershipType:steward", "owner_type": "urn:li:ownershipType:steward",
}, },
], ],
"domain": "domain1",
} }
} }
@ -396,3 +398,6 @@ def test_operation_processor_datahub_props():
assert [ assert [
term_association.urn for term_association in aspect_map["add_term"].terms term_association.urn for term_association in aspect_map["add_term"].terms
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"] ] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]
assert isinstance(aspect_map["add_domain"], DomainsClass)
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]