From 1a2fd24c10af7fce1fa07f4cd916734c5ffe42a6 Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Mon, 12 Aug 2024 23:48:40 +0530 Subject: [PATCH] Fixes: DBT Cloud lineage not showing (#17395) --- .../source/pipeline/dbtcloud/client.py | 29 ++++++++- .../source/pipeline/dbtcloud/metadata.py | 64 +++++++++++-------- .../source/pipeline/dbtcloud/models.py | 12 ++-- .../source/pipeline/dbtcloud/queries.py | 33 ++++++---- 4 files changed, 91 insertions(+), 47 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py index 0a11a270094..1abccb49db8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py @@ -26,7 +26,10 @@ from metadata.ingestion.source.pipeline.dbtcloud.models import ( DBTRun, DBTRunList, ) -from metadata.ingestion.source.pipeline.dbtcloud.queries import DBT_QUERY +from metadata.ingestion.source.pipeline.dbtcloud.queries import ( + DBT_GET_MODEL_DEPENDS_ON, + DBT_GET_MODELS_SEEDS, +) from metadata.utils.constants import AUTHORIZATION_HEADER from metadata.utils.helpers import clean_uri from metadata.utils.logger import ometa_logger @@ -102,7 +105,7 @@ class DBTCloudClient: """ try: query_params = { - "query": DBT_QUERY, + "query": DBT_GET_MODEL_DEPENDS_ON, "variables": {"jobId": job_id, "runId": run_id}, } @@ -116,3 +119,25 @@ class DBTCloudClient: logger.debug(traceback.format_exc()) logger.warning(f"Unable to get model info :{exc}") return None + + def get_models_and_seeds_details(self, job_id: int, run_id: int): + """ + get model details for a job in dbt cloud for lineage + """ + try: + query_params = { + "query": DBT_GET_MODELS_SEEDS, + "variables": {"jobId": job_id, "runId": run_id}, + } + + result = self.graphql_client.post("", json=query_params) + + if result.get("data") and result["data"].get("job"): + result = DBTModelList(**result["data"]["job"]) + parents_list = result.models + result.seeds + return parents_list + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to get model info :{exc}") + return None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/metadata.py index 8915af595b4..44d61a7422e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/metadata.py @@ -177,6 +177,10 @@ class DbtcloudSource(PipelineServiceSource): job_id=pipeline_details.id, run_id=self.context.get().latest_run_id ) + dbt_parents = self.client.get_models_and_seeds_details( + job_id=pipeline_details.id, run_id=self.context.get().latest_run_id + ) + for model in dbt_models or []: for dbservicename in ( self.source_config.lineageInformation.dbServiceNames or [] @@ -186,7 +190,7 @@ class DbtcloudSource(PipelineServiceSource): fqn=fqn.build( metadata=self.metadata, entity_type=Table, - table_name=model.alias, + table_name=model.name, database_name=model.database, schema_name=model.dbtschema, service_name=dbservicename, @@ -196,37 +200,41 @@ class DbtcloudSource(PipelineServiceSource): if to_entity is None: continue - for dest in model.parentsSources or []: - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=fqn.build( - metadata=self.metadata, - entity_type=Table, - table_name=dest.name, - database_name=dest.database, - schema_name=dest.dbtschema, - service_name=dbservicename, - ), - ) + for unique_id in model.dependsOn or []: + parents = [ + d for d in dbt_parents if d.uniqueId == unique_id + ] + if parents: + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=parents[0].name, + database_name=parents[0].database, + schema_name=parents[0].dbtschema, + service_name=dbservicename, + ), + ) - if from_entity is None: - continue + if from_entity is None: + continue - yield Either( - right=AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id, - type="table", - ), - toEntity=EntityReference( - id=to_entity.id, - type="table", - ), - lineageDetails=lineage_details, + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id, + type="table", + ), + lineageDetails=lineage_details, + ) ) ) - ) except Exception as exc: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py index 1ba9f90e3e5..6273deacd92 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py @@ -35,7 +35,7 @@ class DBTJob(BaseModel): class DBTJobList(BaseModel): - Jobs: Optional[List[DBTJob]] = Field(None, alias="data") + Jobs: Optional[List[DBTJob]] = Field([], alias="data") class DBTRun(BaseModel): @@ -50,7 +50,7 @@ class DBTRun(BaseModel): class DBTRunList(BaseModel): - Runs: Optional[List[DBTRun]] = Field(None, alias="data") + Runs: Optional[List[DBTRun]] = Field([], alias="data") class DBTSources(BaseModel): @@ -60,13 +60,13 @@ class DBTSources(BaseModel): class DBTModel(BaseModel): + uniqueId: Optional[str] = None name: Optional[str] = None - alias: Optional[str] = None dbtschema: Optional[str] = Field(None, alias="schema") database: Optional[str] = None - rawSql: Optional[str] = None - parentsSources: Optional[List[DBTSources]] = None + dependsOn: Optional[List[str]] = None class DBTModelList(BaseModel): - models: Optional[List[DBTModel]] = None + models: Optional[List[DBTModel]] = [] + seeds: Optional[List[DBTModel]] = [] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/queries.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/queries.py index 42ae31a997e..1c3260a6ccb 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/queries.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/queries.py @@ -12,22 +12,33 @@ GraphQL Queries used during ingestion """ -DBT_QUERY = """ +DBT_GET_MODEL_DEPENDS_ON = """ query Query($jobId: BigInt!, $runId: BigInt) { job(id: $jobId, runId: $runId) { models { - name #destinationTable - alias + name database schema - rawSql - materializedType - parentsSources { - database - name - schema - sourceName #sourceTable - } + dependsOn + } + } +} +""" + +DBT_GET_MODELS_SEEDS = """ +query Query($jobId: BigInt!, $runId: BigInt) { + job(id: $jobId, runId: $runId) { + models { + uniqueId + name + database + schema + } + seeds { + uniqueId + name + schema + database } } }