feat: derive downstream lineage from DBT exposures (#21992)

* 🎉 Init

* replace TARGET with EXPOSURE

* refactor and document

* add docs

* handle missing type/entity not matching

* linter

* update docs

* refactor for using label for communicating FQN as name field cannot contain special characters other than underscore. Storing dots in the name works for now but there is a deprecation warning and it will fail in the future.

* improve docs

* improve docs

* improve logging

* refactor for usage of meta.open_metadata_fqn

* linting

* update docs

* update docs

* fix docs

* 🎉 Add tests
This commit is contained in:
mgorsk1 2025-07-07 16:34:33 +02:00 committed by GitHub
parent 087172e537
commit 3f01be6756
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 20219 additions and 8 deletions

View File

@ -14,11 +14,17 @@ Constants required for dbt
from enum import Enum
from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.mlmodel import MlModel
DBT_RUN_RESULT_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# Based on https://schemas.getdbt.com/dbt/manifest/v7/index.html
REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"]
REQUIRED_EXPOSURE_KEYS = ["name", "meta", "type", "resource_type", "sources"]
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]
@ -84,7 +90,6 @@ REQUIRED_NODE_KEYS = {
"language",
}
NONE_KEYWORDS_LIST = ["none", "null"]
DBT_CATALOG_FILE_NAME = "catalog.json"
@ -148,6 +153,8 @@ class DbtCommonEnum(Enum):
OWNER = "owner"
NODES = "nodes"
SOURCES = "sources"
EXPOSURE = "exposure"
EXPOSURES = "exposures"
SOURCES_FILE = "sources_file"
SOURCE = "source"
RESOURCETYPE = "resource_type"
@ -156,3 +163,12 @@ class DbtCommonEnum(Enum):
RESULTS = "results"
TEST_SUITE_NAME = "test_suite_name"
DBT_TEST_SUITE = "DBT_TEST_SUITE"
# DBT Supports more types of exposures but only these map nicely
# https://docs.getdbt.com/docs/build/exposures#available-properties
ExposureTypeMap = {
"dashboard": {"entity_type": Dashboard, "entity_type_name": "dashboard"},
"ml": {"entity_type": MlModel, "entity_type_name": "mlmodel"},
"application": {"entity_type": APIEndpoint, "entity_type_name": "apiEndpoint"},
}

View File

@ -81,6 +81,7 @@ class DbtServiceTopology(ServiceTopology):
"process_dbt_data_model",
"process_dbt_entities",
"process_dbt_tests",
"process_dbt_exposures",
],
)
process_dbt_data_model: Annotated[
@ -151,6 +152,20 @@ class DbtServiceTopology(ServiceTopology):
],
)
process_dbt_exposures: Annotated[
TopologyNode, Field(description="Process dbt exposures")
] = TopologyNode(
producer="get_dbt_exposures",
stages=[
NodeStage(
type_=AddLineageRequest,
processor="create_dbt_exposures_lineage",
consumer=["yield_data_models"],
nullable=True,
),
],
)
class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"""
@ -175,7 +190,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
# This step is necessary as the manifest file may not always adhere to the schema definition
# and the presence of other nodes can hinder the ingestion process from progressing any further.
# Therefore, we are only retaining the essential data for further processing.
required_manifest_keys = {"nodes", "sources", "metadata"}
required_manifest_keys = {"nodes", "sources", "metadata", "exposures"}
manifest_dict.update(
{
key: {}
@ -284,6 +299,12 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
Method to process DBT lineage from upstream nodes
"""
@abstractmethod
def create_dbt_exposures_lineage(self, exposure_spec: dict) -> AddLineageRequest:
"""
Method to process DBT lineage from exposures
"""
@abstractmethod
def create_dbt_query_lineage(
self, data_model_link: DataModelLink
@ -311,6 +332,14 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
for _, dbt_test in self.context.get().dbt_tests.items():
yield dbt_test
def get_dbt_exposures(self) -> dict:
"""
Prepare the DBT exposures
"""
for _, exposure in self.context.get().exposures.items():
yield exposure
@abstractmethod
def create_dbt_tests_definition(
self, dbt_test: dict

View File

@ -70,10 +70,12 @@ from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.constants import (
DBT_RUN_RESULT_DATE_FORMAT,
REQUIRED_CATALOG_KEYS,
REQUIRED_EXPOSURE_KEYS,
REQUIRED_MANIFEST_KEYS,
DbtCommonEnum,
DbtTestFailureEnum,
DbtTestSuccessEnum,
ExposureTypeMap,
SkipResourceTypeEnum,
)
from metadata.ingestion.source.database.dbt.dbt_service import (
@ -210,6 +212,7 @@ class DbtSource(DbtServiceSource):
manifest_entities = {
**dbt_files.dbt_manifest[DbtCommonEnum.NODES.value],
**dbt_files.dbt_manifest[DbtCommonEnum.SOURCES.value],
**dbt_files.dbt_manifest.get(DbtCommonEnum.EXPOSURES.value, {}),
}
catalog_entities = None
if dbt_files.dbt_catalog:
@ -223,6 +226,23 @@ class DbtSource(DbtServiceSource):
]:
continue
if (
manifest_node[DbtCommonEnum.RESOURCETYPE.value]
== DbtCommonEnum.EXPOSURE.value
):
if all(
required_key in manifest_node
for required_key in REQUIRED_EXPOSURE_KEYS
):
logger.debug(f"Successfully Validated DBT Node: {key}")
else:
logger.warning(
f"Error validating DBT Node: {key}\n"
f"Please check if following keys exist for the node: {REQUIRED_EXPOSURE_KEYS}"
)
continue
# Validate if all the required keys are present in the manifest nodes
if all(
required_key in manifest_node
@ -347,6 +367,19 @@ class DbtSource(DbtServiceSource):
None,
)
def add_dbt_exposure(self, key: str, manifest_node, manifest_entities):
exposure_entity = self.parse_exposure_node(manifest_node)
if exposure_entity:
self.context.get().exposures[key] = {
DbtCommonEnum.EXPOSURE: exposure_entity,
DbtCommonEnum.MANIFEST_NODE: manifest_node,
}
self.context.get().exposures[key][
DbtCommonEnum.UPSTREAM
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
@ -440,6 +473,7 @@ class DbtSource(DbtServiceSource):
manifest_entities = {
**dbt_objects.dbt_manifest.sources,
**dbt_objects.dbt_manifest.nodes,
**dbt_objects.dbt_manifest.exposures,
}
catalog_entities = None
if dbt_objects.dbt_catalog:
@ -448,6 +482,7 @@ class DbtSource(DbtServiceSource):
**dbt_objects.dbt_catalog.nodes,
}
self.context.get().data_model_links = []
self.context.get().exposures = {}
self.context.get().dbt_tests = {}
self.context.get().run_results_generate_time = None
# Since we'll be processing multiple run_results for a single project
@ -491,6 +526,10 @@ class DbtSource(DbtServiceSource):
dbt_objects=dbt_objects,
)
if resource_type == DbtCommonEnum.EXPOSURE.value:
self.add_dbt_exposure(key, manifest_node, manifest_entities)
continue
# Skip the ephemeral nodes since it is not materialized
if check_ephemeral_node(manifest_node):
logger.debug(f"Skipping ephemeral DBT node: {key}.")
@ -736,6 +775,57 @@ class DbtSource(DbtServiceSource):
return columns
def parse_exposure_node(self, exposure_spec) -> Optional[Any]:
"""
Parses the exposure node verifying if it's type is supported and if provided label matches FQN of
Open Metadata entity. Returns entity object if both conditions are met.
The implementation assumes that value of meta.open_metadata_fqn provided in DBT exposures object matches
to FQN of OpenMetadata entity.
```yaml
exposures:
- name: orders_dashboard
label: orders
meta:
open_metadata_fqn: sample_looker.orders # OpenMetadata entity FullyQualifiedName
type: dashboard
maturity: high
url: http://localhost:808/looker/dashboard/8/
description: >
Exemplary OM Looker Dashboard.
depends_on:
- ref('fact_sales')
```
"""
exposure_type = exposure_spec.type.value
entity_type = ExposureTypeMap.get(exposure_type, {}).get("entity_type")
if not entity_type:
logger.warning(f"Exposure type [{exposure_spec.type.value}] not supported.")
return None
try:
entity_fqn = exposure_spec.meta["open_metadata_fqn"]
except KeyError:
logger.warning(
f"meta.open_metadata_fqn not found in [{exposure_spec.name}] exposure spec."
)
return None
entity = self.metadata.get_by_name(fqn=entity_fqn, entity=entity_type)
if not entity:
logger.warning(
f"Entity [{entity_fqn}] of [{exposure_type}] type not found in Open Metadata."
)
return None
return entity
def create_dbt_lineage(
self, data_model_link: DataModelLink
) -> Iterable[Either[AddLineageRequest]]:
@ -829,6 +919,51 @@ class DbtSource(DbtServiceSource):
)
)
def create_dbt_exposures_lineage(
self, exposure_spec: dict
) -> Iterable[Either[AddLineageRequest]]:
to_entity = exposure_spec[DbtCommonEnum.EXPOSURE]
upstream = exposure_spec[DbtCommonEnum.UPSTREAM]
manifest_node = exposure_spec[DbtCommonEnum.MANIFEST_NODE]
for upstream_node in upstream:
try:
from_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=upstream_node,
)
from_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=from_es_result, fetch_multiple_entities=False
)
if from_entity and to_entity:
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=Uuid(from_entity.id.root),
type="table",
),
toEntity=EntityReference(
id=Uuid(to_entity.id.root),
type=ExposureTypeMap[manifest_node.type.value][
"entity_type_name"
],
),
lineageDetails=LineageDetails(
source=LineageSource.DbtLineage
),
)
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {upstream_node} to capture lineage: {exc}"
)
def process_dbt_meta(self, manifest_meta):
"""
Method to process DBT meta for Tags and GlossaryTerms

View File

@ -19540,7 +19540,162 @@
"block_contents": "### Welcome!\n\nWelcome to the auto-generated documentation for your dbt project!\n\n### Navigation\n\nYou can use the `Project` and `Database` navigation tabs on the left side of the window to explore the models\nin your project.\n\n#### Project Tab\nThe `Project` tab mirrors the directory structure of your dbt project. In this tab, you can see all of the\nmodels defined in your dbt project, as well as models imported from dbt packages.\n\n#### Database Tab\nThe `Database` tab also exposes your models, but in a format that looks more like a database explorer. This view\nshows relations (tables and views) grouped into database schemas. Note that ephemeral models are _not_ shown\nin this interface, as they do not exist in the database.\n\n### Graph Exploration\nYou can click the blue icon on the bottom-right corner of the page to view the lineage graph of your models.\n\nOn model pages, you'll see the immediate parents and children of the model you're exploring. By clicking the `Expand`\nbutton at the top-right of this lineage pane, you'll be able to see all of the models that are used to build,\nor are built from, the model you're exploring.\n\nOnce expanded, you'll be able to use the `--select` and `--exclude` model selection syntax to filter the\nmodels in the graph. For more information on model selection, check out the [dbt docs](https://docs.getdbt.com/docs/model-selection-syntax).\n\nNote that you can also right-click on models to interactively filter and explore the graph.\n\n---\n\n### More information\n\n- [What is dbt](https://docs.getdbt.com/docs/introduction)?\n- Read the [dbt viewpoint](https://docs.getdbt.com/docs/viewpoint)\n- [Installation](https://docs.getdbt.com/docs/installation)\n- Join the [dbt Community](https://www.getdbt.com/community/) for questions and discussion"
}
},
"exposures": {},
"exposures": {
"exposure.dbt_tutorial.forecast_sales": {
"name": "forecast_sales",
"resource_type": "exposure",
"package_name": "dbt_tutorial",
"path": "warehouse/fact_sales.yml",
"original_file_path": "models/warehouse/fact_sales.yml",
"unique_id": "exposure.dbt_tutorial.forecast_sales",
"fqn": [
"dbt_tutorial",
"warehouse",
"forecast_sales"
],
"type": "dashboard",
"owner": {
"email": "gorskimariusz13@gmail.com",
"name": "Mariusz G\u00f3rski"
},
"description": "Exemplary OM API Endpoint.\n",
"label": "asd",
"maturity": "high",
"meta": {
"open_metadata_fqn": "mlflow_svc.forecast_sales"
},
"tags": [],
"config": {
"enabled": true,
"tags": [],
"meta": {
"open_metadata_fqn": "mlflow_svc.forecast_sales"
}
},
"unrendered_config": {},
"url": "https://docs.open-metadata.org/api/v1/tables",
"depends_on": {
"macros": [],
"nodes": [
"model.dbt_tutorial.fact_sales",
"model.dbt_tutorial.dim_product"
]
},
"refs": [
{
"name": "fact_sales",
"package": null,
"version": null
},
{
"name": "dim_product",
"package": null,
"version": null
}
],
"sources": [],
"metrics": [],
"created_at": 1751007276.544977
},
"exposure.dbt_tutorial.create_table_endpoint": {
"name": "create_table_endpoint",
"resource_type": "exposure",
"package_name": "dbt_tutorial",
"path": "warehouse/fact_sales.yml",
"original_file_path": "models/warehouse/fact_sales.yml",
"unique_id": "exposure.dbt_tutorial.create_table_endpoint",
"fqn": [
"dbt_tutorial",
"warehouse",
"create_table_endpoint"
],
"type": "application",
"owner": {
"email": "gorskimariusz13@gmail.com",
"name": "Mariusz G\u00f3rski"
},
"description": "Exemplary OM API Endpoint.\n",
"label": "asd",
"maturity": "high",
"meta": {
"open_metadata_fqn": "ometa_api_service.tables.createTable"
},
"tags": [],
"config": {
"enabled": true,
"tags": [],
"meta": {
"open_metadata_fqn": "ometa_api_service.tables.createTable"
}
},
"unrendered_config": {},
"url": "https://docs.open-metadata.org/api/v1/tables",
"depends_on": {
"macros": [],
"nodes": [
"model.dbt_tutorial.fact_sales"
]
},
"refs": [
{
"name": "fact_sales",
"package": null,
"version": null
}
],
"sources": [],
"metrics": [],
"created_at": 1751007276.5458581
},
"exposure.dbt_tutorial.looker_orders": {
"name": "looker_orders",
"resource_type": "exposure",
"package_name": "dbt_tutorial",
"path": "warehouse/fact_sales.yml",
"original_file_path": "models/warehouse/fact_sales.yml",
"unique_id": "exposure.dbt_tutorial.looker_orders",
"fqn": [
"dbt_tutorial",
"warehouse",
"looker_orders"
],
"type": "dashboard",
"owner": {
"email": "gorskimariusz13@gmail.com",
"name": "Mariusz G\u00f3rski"
},
"description": "Exemplary OM Looker Dashboard.\n",
"label": "zxc",
"maturity": "high",
"meta": {
"open_metadata_fqn": "dummy"
},
"tags": [],
"config": {
"enabled": true,
"tags": [],
"meta": {}
},
"unrendered_config": {},
"url": "http://localhost:808/looker/dashboard/8/",
"depends_on": {
"macros": [],
"nodes": [
"model.dbt_tutorial.fact_sales"
]
},
"refs": [
{
"name": "fact_sales",
"package": null,
"version": null
}
],
"sources": [],
"metrics": [],
"created_at": 1751007451.077749
}
},
"metrics": {},
"groups": {},
"selectors": {},

File diff suppressed because one or more lines are too long

View File

@ -15,10 +15,14 @@ from collate_dbt_artifacts_parser.parser import (
)
from pydantic import AnyUrl
from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type import entityReference
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.generated.schema.type.tagLabel import (
@ -87,11 +91,15 @@ MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json"
MOCK_SAMPLE_MANIFEST_VERSIONLESS = "resources/datasets/manifest_versionless.json"
MOCK_SAMPLE_MANIFEST_VERSIONLESS_BROKEN_EXPOSURES = (
"resources/datasets/manifest_versionless_broken_exposures.json"
)
MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json"
MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json"
EXPECTED_DATA_MODEL_FQNS = [
"dbt_test.dev.dbt_jaffle.customers",
"dbt_test.dev.dbt_jaffle.orders",
@ -299,6 +307,26 @@ EXPECTED_DATA_MODEL_VERSIONLESS = [
)
]
EXPECTED_EXPOSURE_ENTITIES = [
Dashboard(
id=uuid.uuid4(),
name="looker_dashboard",
service=entityReference.EntityReference(id=uuid.uuid4(), type="dashboard"),
),
MlModel(
id=uuid.uuid4(),
name="mlflow_model",
algorithm="lr",
service=entityReference.EntityReference(id=uuid.uuid4(), type="mlModel"),
),
APIEndpoint(
id=uuid.uuid4(),
name="createTable",
endpointURL="http://localhost:8000",
service=entityReference.EntityReference(id=uuid.uuid4(), type="apiEndpoint"),
),
]
MOCK_OWNER = EntityReferenceList(
root=[
EntityReference(
@ -324,7 +352,6 @@ MOCK_USER = EntityReference(
fullyQualifiedName="aaron_johnson0",
)
MOCK_TABLE_ENTITIES = [
Table(
id=uuid.uuid4(),
@ -477,7 +504,7 @@ class DbtUnitTest(TestCase):
]
self.execute_test(
MOCK_SAMPLE_MANIFEST_VERSIONLESS,
expected_records=9,
expected_records=12,
expected_data_models=EXPECTED_DATA_MODEL_VERSIONLESS,
)
@ -759,3 +786,42 @@ class DbtUnitTest(TestCase):
)
self.assertEqual(dbt_meta_tags, MOCK_GLOASSARY_LABELS)
def test_parse_exposure_node_exposure_absent(self):
_, dbt_objects = self.get_dbt_object_files(MOCK_SAMPLE_MANIFEST_V8)
parsed_exposures = [
self.dbt_source_obj.parse_exposure_node(node)
for _, node in dbt_objects.dbt_manifest.exposures.items()
]
assert len(list(filter(lambda x: x is not None, parsed_exposures))) == 0
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name")
def test_parse_exposure_node_exposure_happy_path(self, get_by_name):
get_by_name.side_effect = EXPECTED_EXPOSURE_ENTITIES
_, dbt_objects = self.get_dbt_object_files(MOCK_SAMPLE_MANIFEST_VERSIONLESS)
parsed_exposures = [
self.dbt_source_obj.parse_exposure_node(node)
for _, node in dbt_objects.dbt_manifest.exposures.items()
]
assert len(list(filter(lambda x: x is not None, parsed_exposures))) == 3
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name")
def test_parse_exposure_node_exposure_broken_exposures(self, get_by_name):
"""
Test on data where there is one exposure with missing open_metadata_fqn and one with unsupported type.
"""
get_by_name.side_effect = EXPECTED_EXPOSURE_ENTITIES
_, dbt_objects = self.get_dbt_object_files(
MOCK_SAMPLE_MANIFEST_VERSIONLESS_BROKEN_EXPOSURES
)
parsed_exposures = [
self.dbt_source_obj.parse_exposure_node(node)
for _, node in dbt_objects.dbt_manifest.exposures.items()
]
assert len(list(filter(lambda x: x is not None, parsed_exposures))) == 0

View File

@ -240,9 +240,11 @@ dbt test # Generate run_results.json
## What Gets Ingested
- **Model Definitions**: Queries, configurations, and relationships
- **Lineage**: Table-to-table and column-level lineage
- **Lineage**:
- table-to-table lineage with column-level lineage (column-level lineage is supported only in dbt cloud).
- table-to-dashboard,mlmodel,apiendpoint through [dbt exposures](https://docs.getdbt.com/docs/build/exposures)
- **Documentation**: Model and column descriptions
- **Data Quality**: dbt test definitions and results
- **Data Quality**: dbt test definitions and results - including [dbt freshness](https://docs.getdbt.com/reference/resource-properties/freshness) tests
- **Tags & Classification**: Model and column tags
- **Ownership**: Model owners and team assignments

View File

@ -47,6 +47,7 @@ Configure the auto dbt ingestion for dbt-core.
| dbt Owner | {% icon iconName="check" /%} |
| dbt Descriptions | {% icon iconName="check" /%} |
| dbt Tests | {% icon iconName="check" /%} |
| dbt Exposures | {% icon iconName="check" /%} |
| Supported dbt Core Versions | `v1.2` `v1.3` `v1.5` `v1.5` `v1.6` `v1.7` `v1.8` `v1.9`|
{% /multiTablesWrapper %}

View File

@ -47,6 +47,7 @@ Configure the auto dbt ingestion for dbt-core.
| dbt Owner | {% icon iconName="check" /%} |
| dbt Descriptions | {% icon iconName="check" /%} |
| dbt Tests | {% icon iconName="check" /%} |
| dbt Exposures | {% icon iconName="check" /%} |
| Supported dbt Core Versions | `v1.2` `v1.3` `v1.5` `v1.5` `v1.6` `v1.7` `v1.8` `v1.9`|
{% /multiTablesWrapper %}