Fixes: DBT Cloud lineage not showing (#17395)

This commit is contained in:
Suman Maharana 2024-08-12 23:48:40 +05:30 committed by GitHub
parent f7f30799d4
commit 1a2fd24c10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 47 deletions

View File

@ -26,7 +26,10 @@ from metadata.ingestion.source.pipeline.dbtcloud.models import (
DBTRun,
DBTRunList,
)
from metadata.ingestion.source.pipeline.dbtcloud.queries import DBT_QUERY
from metadata.ingestion.source.pipeline.dbtcloud.queries import (
DBT_GET_MODEL_DEPENDS_ON,
DBT_GET_MODELS_SEEDS,
)
from metadata.utils.constants import AUTHORIZATION_HEADER
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ometa_logger
@ -102,7 +105,7 @@ class DBTCloudClient:
"""
try:
query_params = {
"query": DBT_QUERY,
"query": DBT_GET_MODEL_DEPENDS_ON,
"variables": {"jobId": job_id, "runId": run_id},
}
@ -116,3 +119,25 @@ class DBTCloudClient:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get model info :{exc}")
return None
def get_models_and_seeds_details(self, job_id: int, run_id: int):
"""
get model details for a job in dbt cloud for lineage
"""
try:
query_params = {
"query": DBT_GET_MODELS_SEEDS,
"variables": {"jobId": job_id, "runId": run_id},
}
result = self.graphql_client.post("", json=query_params)
if result.get("data") and result["data"].get("job"):
result = DBTModelList(**result["data"]["job"])
parents_list = result.models + result.seeds
return parents_list
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get model info :{exc}")
return None

View File

@ -177,6 +177,10 @@ class DbtcloudSource(PipelineServiceSource):
job_id=pipeline_details.id, run_id=self.context.get().latest_run_id
)
dbt_parents = self.client.get_models_and_seeds_details(
job_id=pipeline_details.id, run_id=self.context.get().latest_run_id
)
for model in dbt_models or []:
for dbservicename in (
self.source_config.lineageInformation.dbServiceNames or []
@ -186,7 +190,7 @@ class DbtcloudSource(PipelineServiceSource):
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=model.alias,
table_name=model.name,
database_name=model.database,
schema_name=model.dbtschema,
service_name=dbservicename,
@ -196,15 +200,19 @@ class DbtcloudSource(PipelineServiceSource):
if to_entity is None:
continue
for dest in model.parentsSources or []:
for unique_id in model.dependsOn or []:
parents = [
d for d in dbt_parents if d.uniqueId == unique_id
]
if parents:
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=dest.name,
database_name=dest.database,
schema_name=dest.dbtschema,
table_name=parents[0].name,
database_name=parents[0].database,
schema_name=parents[0].dbtschema,
service_name=dbservicename,
),
)

View File

@ -35,7 +35,7 @@ class DBTJob(BaseModel):
class DBTJobList(BaseModel):
Jobs: Optional[List[DBTJob]] = Field(None, alias="data")
Jobs: Optional[List[DBTJob]] = Field([], alias="data")
class DBTRun(BaseModel):
@ -50,7 +50,7 @@ class DBTRun(BaseModel):
class DBTRunList(BaseModel):
Runs: Optional[List[DBTRun]] = Field(None, alias="data")
Runs: Optional[List[DBTRun]] = Field([], alias="data")
class DBTSources(BaseModel):
@ -60,13 +60,13 @@ class DBTSources(BaseModel):
class DBTModel(BaseModel):
uniqueId: Optional[str] = None
name: Optional[str] = None
alias: Optional[str] = None
dbtschema: Optional[str] = Field(None, alias="schema")
database: Optional[str] = None
rawSql: Optional[str] = None
parentsSources: Optional[List[DBTSources]] = None
dependsOn: Optional[List[str]] = None
class DBTModelList(BaseModel):
models: Optional[List[DBTModel]] = None
models: Optional[List[DBTModel]] = []
seeds: Optional[List[DBTModel]] = []

View File

@ -12,22 +12,33 @@
GraphQL Queries used during ingestion
"""
DBT_QUERY = """
DBT_GET_MODEL_DEPENDS_ON = """
query Query($jobId: BigInt!, $runId: BigInt) {
job(id: $jobId, runId: $runId) {
models {
name #destinationTable
alias
database
schema
rawSql
materializedType
parentsSources {
database
name
database
schema
sourceName #sourceTable
}
dependsOn
}
}
}
"""
DBT_GET_MODELS_SEEDS = """
query Query($jobId: BigInt!, $runId: BigInt) {
job(id: $jobId, runId: $runId) {
models {
uniqueId
name
database
schema
}
seeds {
uniqueId
name
schema
database
}
}
}