diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d8b1a47690..e973ff629e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -113,6 +113,11 @@ classification_lib = { "numpy<2", } +dbt_common = { + *sqlglot_lib, + "more_itertools", +} + sql_common = ( { # Required for all SQL sources. @@ -352,8 +357,8 @@ plugins: Dict[str, Set[str]] = { "datahub-lineage-file": set(), "datahub-business-glossary": set(), "delta-lake": {*data_lake_profiling, *delta_lake}, - "dbt": {"requests"} | sqlglot_lib | aws_common, - "dbt-cloud": {"requests"} | sqlglot_lib, + "dbt": {"requests"} | dbt_common | aws_common, + "dbt-cloud": {"requests"} | dbt_common, "druid": sql_common | {"pydruid>=0.6.2"}, "dynamodb": aws_common | classification_lib, # Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 31c7518152..a069498366 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -23,6 +23,7 @@ from typing import ( ) import typing_inspect +from avrogen.dict_wrapper import DictWrapper from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION from datahub.metadata.schema_classes import ( @@ -412,9 +413,9 @@ def make_lineage_mce( return mce -def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool: - SnapshotType = type(mce.proposedSnapshot) - +def can_add_aspect_to_snapshot( + SnapshotType: Type[DictWrapper], AspectType: Type[Aspect] +) -> bool: constructor_annotations = get_type_hints(SnapshotType.__init__) aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0] @@ -423,6 +424,12 @@ def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> b return issubclass(AspectType, supported_aspect_types) +def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool: + SnapshotType = type(mce.proposedSnapshot) + + return can_add_aspect_to_snapshot(SnapshotType, AspectType) + + def assert_can_add_aspect( mce: MetadataChangeEventClass, AspectType: Type[Aspect] ) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index f56784ab1f..9972e9e463 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -7,6 +7,7 @@ from datetime import datetime from enum import auto from typing import Any, Dict, Iterable, List, Optional, Tuple +import more_itertools import pydantic from pydantic import root_validator, validator from pydantic.fields import Field @@ -1309,8 +1310,23 @@ class DBTSourceBase(StatefulIngestionSourceBase): aspect=self._make_data_platform_instance_aspect(), ).as_workunit() + standalone_aspects, snapshot_aspects = more_itertools.partition( + ( + lambda aspect: mce_builder.can_add_aspect_to_snapshot( + DatasetSnapshot, type(aspect) + ) + ), + aspects, + ) + for aspect in standalone_aspects: + # The domains aspect, and some others, may not support being added to the snapshot. + yield MetadataChangeProposalWrapper( + entityUrn=node_datahub_urn, + aspect=aspect, + ).as_workunit() + dataset_snapshot = DatasetSnapshot( - urn=node_datahub_urn, aspects=aspects + urn=node_datahub_urn, aspects=list(snapshot_aspects) ) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) if self.config.write_semantics == "PATCH": @@ -1588,6 +1604,10 @@ class DBTSourceBase(StatefulIngestionSourceBase): ): aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION)) + # add meta domains aspect + if meta_aspects.get(Constants.ADD_DOMAIN_OPERATION): + aspects.append(meta_aspects.get(Constants.ADD_DOMAIN_OPERATION)) + # add meta links aspect meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION) if meta_links_aspect and self.config.enable_meta_mapping: diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 391657f841..ac10e26f76 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -15,6 +15,7 @@ from datahub.emitter.mce_builder import ( ) from datahub.metadata.schema_classes import ( AuditStampClass, + DomainsClass, InstitutionalMemoryClass, InstitutionalMemoryMetadataClass, OwnerClass, @@ -70,6 +71,8 @@ class Constants: ADD_TERM_OPERATION = "add_term" ADD_TERMS_OPERATION = "add_terms" ADD_OWNER_OPERATION = "add_owner" + ADD_DOMAIN_OPERATION = "add_domain" + OPERATION = "operation" OPERATION_CONFIG = "config" TAG = "tag" @@ -94,9 +97,15 @@ class _MappingOwner(ConfigModel): class _DatahubProps(ConfigModel): - owners: List[Union[str, _MappingOwner]] + tags: Optional[List[str]] = None + terms: Optional[List[str]] = None + owners: Optional[List[Union[str, _MappingOwner]]] = None + domain: Optional[str] = None def make_owner_category_list(self) -> List[Dict]: + if self.owners is None: + return [] + res = [] for owner in self.owners: if isinstance(owner, str): @@ -176,26 +185,29 @@ class OperationProcessor: # Process the special "datahub" property, which supports tags, terms, and owners. operations_map: Dict[str, list] = {} try: - datahub_prop = raw_props.get("datahub") - if datahub_prop and isinstance(datahub_prop, dict): - if datahub_prop.get("tags"): + raw_datahub_prop = raw_props.get("datahub") + if raw_datahub_prop: + datahub_prop = _DatahubProps.parse_obj_allow_extras(raw_datahub_prop) + if datahub_prop.tags: # Note that tags get converted to urns later because we need to support the tag prefix. - tags = datahub_prop["tags"] operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend( - tags + datahub_prop.tags ) - if datahub_prop.get("terms"): - terms = datahub_prop["terms"] + if datahub_prop.terms: operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend( - mce_builder.make_term_urn(term) for term in terms + mce_builder.make_term_urn(term) for term in datahub_prop.terms ) - if datahub_prop.get("owners"): - owners = _DatahubProps.parse_obj_allow_extras(datahub_prop) + if datahub_prop.owners: operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend( - owners.make_owner_category_list() + datahub_prop.make_owner_category_list() ) + + if datahub_prop.domain: + operations_map.setdefault( + Constants.ADD_DOMAIN_OPERATION, [] + ).append(mce_builder.make_domain_urn(datahub_prop.domain)) except Exception as e: logger.error(f"Error while processing datahub property: {e}") @@ -299,6 +311,15 @@ class OperationProcessor: ) aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect + if Constants.ADD_DOMAIN_OPERATION in operation_map: + domain_aspect = DomainsClass( + domains=[ + mce_builder.make_domain_urn(domain) + for domain in operation_map[Constants.ADD_DOMAIN_OPERATION] + ] + ) + aspect_map[Constants.ADD_DOMAIN_OPERATION] = domain_aspect + if Constants.ADD_DOC_LINK_OPERATION in operation_map: try: if len( diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py index 42b13f6dbe..0e176710bb 100644 --- a/metadata-ingestion/tests/unit/test_mapping.py +++ b/metadata-ingestion/tests/unit/test_mapping.py @@ -2,6 +2,7 @@ from typing import Any, Dict from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags from datahub.metadata.schema_classes import ( + DomainsClass, GlobalTagsClass, GlossaryTermsClass, InstitutionalMemoryClass, @@ -366,6 +367,7 @@ def test_operation_processor_datahub_props(): "owner_type": "urn:li:ownershipType:steward", }, ], + "domain": "domain1", } } @@ -396,3 +398,6 @@ def test_operation_processor_datahub_props(): assert [ term_association.urn for term_association in aspect_map["add_term"].terms ] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"] + + assert isinstance(aspect_map["add_domain"], DomainsClass) + assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]