DBT Models Lineage Fix (#5252)

DBT Models Lineage Fix (#5252)
This commit is contained in:
Onkar Ravgan 2022-06-01 22:05:51 +05:30 committed by GitHub
parent 63fe4f7b9a
commit c1fa31eacc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 11 deletions

View File

@ -229,6 +229,7 @@ class BigquerySource(CommonDbSourceService):
return raw_data_type.replace(", ", ",").replace(" ", ":").lower()
def close(self):
self._create_dbt_lineage()
super().close()
if self.temp_credentials:
os.unlink(self.temp_credentials)

View File

@ -37,7 +37,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.dbt_souce import DBTSource
from metadata.ingestion.source.database.dbt_source import DBTSource
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandler
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.utils.connections import (
@ -208,6 +208,7 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource):
return self._connection
def close(self):
self._create_dbt_lineage()
if self.connection is not None:
self.connection.close()

View File

@ -14,12 +14,15 @@ DBT source methods.
import traceback
from typing import Dict, List
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
ModelType,
Table,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.utils import fqn
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.dbt_config import get_dbt_details
@ -59,19 +62,19 @@ class DBTSource:
and self.dbt_catalog
):
logger.info("Parsing Data Models")
manifest_entities = {
self.manifest_entities = {
**self.dbt_manifest["nodes"],
**self.dbt_manifest["sources"],
}
catalog_entities = {
self.catalog_entities = {
**self.dbt_catalog["nodes"],
**self.dbt_catalog["sources"],
}
for key, mnode in manifest_entities.items():
for key, mnode in self.manifest_entities.items():
try:
name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
cnode = catalog_entities.get(key)
cnode = self.catalog_entities.get(key)
columns = (
self._parse_data_model_columns(name, mnode, cnode)
if cnode
@ -99,6 +102,7 @@ class DBTSource:
model_fqn = fqn.build(
self.metadata,
entity_type=DataModel,
service_name=self.config.serviceName,
database_name=database,
schema_name=schema,
model_name=model_name,
@ -113,16 +117,17 @@ class DBTSource:
if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
for node in mnode["depends_on"]["nodes"]:
try:
_, database, table = node.split(".", 2)
table_fqn = fqn.build(
parent_node = self.manifest_entities[node]
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=None, # issue-5093 Read proper schema and db from manifest
schema_name=None,
table_name=table,
database_name=parent_node["database"],
schema_name=parent_node["schema"],
table_name=parent_node["name"],
)
upstream_nodes.append(table_fqn)
if parent_fqn:
upstream_nodes.append(parent_fqn)
except Exception as err: # pylint: disable=broad-except
logger.error(
f"Failed to parse the node {node} to capture lineage {err}"
@ -159,3 +164,33 @@ class DBTSource:
logger.error(f"Failed to parse column {col_name} due to {err}")
return columns
def _create_dbt_lineage(self):
for data_model_name, data_model in self.data_models.items():
for upstream_node in data_model.upstream:
try:
from_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=upstream_node
)
to_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=data_model_name
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
created_lineage = self.metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
except Exception as err: # pylint: disable=broad-except
logger.error(
f"Failed to parse the node {upstream_node} to capture lineage {err}"
)