diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index f6617d12ff9..322c9a7a492 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -229,6 +229,7 @@ class BigquerySource(CommonDbSourceService): return raw_data_type.replace(", ", ",").replace(" ", ":").lower() def close(self): + self._create_dbt_lineage() super().close() if self.temp_credentials: os.unlink(self.temp_credentials) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 1a9b047eb45..abce8d8e4b3 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -37,7 +37,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.source import SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.dbt_souce import DBTSource +from metadata.ingestion.source.database.dbt_source import DBTSource from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandler from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource from metadata.utils.connections import ( @@ -208,6 +208,7 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource): return self._connection def close(self): + self._create_dbt_lineage() if self.connection is not None: self.connection.close() diff --git a/ingestion/src/metadata/ingestion/source/database/dbt_souce.py b/ingestion/src/metadata/ingestion/source/database/dbt_source.py similarity index 71% rename from ingestion/src/metadata/ingestion/source/database/dbt_souce.py rename to ingestion/src/metadata/ingestion/source/database/dbt_source.py index 53b9f702c26..68dc07ca349 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt_souce.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt_source.py @@ -14,12 +14,15 @@ DBT source methods. import traceback from typing import Dict, List +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import ( Column, DataModel, ModelType, Table, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference from metadata.utils import fqn from metadata.utils.column_type_parser import ColumnTypeParser from metadata.utils.dbt_config import get_dbt_details @@ -59,19 +62,19 @@ class DBTSource: and self.dbt_catalog ): logger.info("Parsing Data Models") - manifest_entities = { + self.manifest_entities = { **self.dbt_manifest["nodes"], **self.dbt_manifest["sources"], } - catalog_entities = { + self.catalog_entities = { **self.dbt_catalog["nodes"], **self.dbt_catalog["sources"], } - for key, mnode in manifest_entities.items(): + for key, mnode in self.manifest_entities.items(): try: name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"] - cnode = catalog_entities.get(key) + cnode = self.catalog_entities.get(key) columns = ( self._parse_data_model_columns(name, mnode, cnode) if cnode @@ -99,6 +102,7 @@ class DBTSource: model_fqn = fqn.build( self.metadata, entity_type=DataModel, + service_name=self.config.serviceName, database_name=database, schema_name=schema, model_name=model_name, @@ -113,16 +117,17 @@ class DBTSource: if "depends_on" in mnode and "nodes" in mnode["depends_on"]: for node in mnode["depends_on"]["nodes"]: try: - _, database, table = node.split(".", 2) - table_fqn = fqn.build( + parent_node = self.manifest_entities[node] + parent_fqn = fqn.build( self.metadata, entity_type=Table, service_name=self.config.serviceName, - database_name=None, # issue-5093 Read proper schema and db from manifest - schema_name=None, - table_name=table, + database_name=parent_node["database"], + schema_name=parent_node["schema"], + table_name=parent_node["name"], ) - upstream_nodes.append(table_fqn) + if parent_fqn: + upstream_nodes.append(parent_fqn) except Exception as err: # pylint: disable=broad-except logger.error( f"Failed to parse the node {node} to capture lineage {err}" @@ -159,3 +164,33 @@ class DBTSource: logger.error(f"Failed to parse column {col_name} due to {err}") return columns + + def _create_dbt_lineage(self): + for data_model_name, data_model in self.data_models.items(): + for upstream_node in data_model.upstream: + try: + from_entity: Table = self.metadata.get_by_name( + entity=Table, fqn=upstream_node + ) + to_entity: Table = self.metadata.get_by_name( + entity=Table, fqn=data_model_name + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id.__root__, + type="table", + ), + ) + ) + created_lineage = self.metadata.add_lineage(lineage) + logger.info(f"Successfully added Lineage {created_lineage}") + except Exception as err: # pylint: disable=broad-except + logger.error( + f"Failed to parse the node {upstream_node} to capture lineage {err}" + )