Fix: dbt cloud latest run execution (#20573)

* Fix: dbt cloud latest run execution

* update latest run id

* set default to 100
This commit is contained in:
Suman Maharana 2025-04-03 11:13:17 +05:30 committed by SumanMaharana
parent 7ec8552742
commit 18bcf9eaaf
6 changed files with 87 additions and 16 deletions

View File

@ -75,17 +75,37 @@ class DBTCloudClient:
fetch jobs for an account in dbt cloud
"""
job_list = []
# we will get 100 jobs at a time
query_params = {"offset": 0, "limit": 100}
try:
job_path = f"{job_id}/" if job_id else ""
project_path = f"?project_id={project_id}" if project_id else ""
result = self.client.get(
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}"
)
job_list = (
[DBTJob.model_validate(result["data"])]
if job_id
else DBTJobList.model_validate(result).Jobs
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}",
data=query_params,
)
if job_id:
job_list = [DBTJob.model_validate(result["data"])]
else:
job_list_response = DBTJobList.model_validate(result)
job_list = job_list_response.Jobs
while job_list_response.extra and job_list_response.extra.pagination:
total_count = job_list_response.extra.pagination.total_count
current_count = job_list_response.extra.pagination.count
if current_count >= total_count:
break
query_params["offset"] += query_params["limit"]
result = self.client.get(
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}",
data=query_params,
)
job_list_response = DBTJobList.model_validate(result)
job_list.extend(job_list_response.Jobs)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
@ -147,13 +167,42 @@ class DBTCloudClient:
list runs for a job in dbt cloud
"""
try:
number_of_runs = self.config.numberOfRuns
runs = []
# we will get 100 runs at a time and order by created_at in descending order
query_params = {
"job_definition_id": job_id,
"offset": 0,
"limit": min(100, number_of_runs) if number_of_runs else 100,
"order_by": "-created_at",
}
result = self.client.get(
f"/accounts/{self.config.accountId}/runs/",
data={"job_definition_id": job_id},
f"/accounts/{self.config.accountId}/runs/", data=query_params
)
if result:
run_list = DBTRunList.model_validate(result).Runs
return run_list
run_list_response = DBTRunList.model_validate(result)
runs.extend(run_list_response.Runs)
while (
(number_of_runs is None or len(runs) < number_of_runs)
and run_list_response.extra
and run_list_response.extra.pagination
):
total_count = run_list_response.extra.pagination.total_count
current_count = run_list_response.extra.pagination.count
if current_count >= total_count:
break
query_params["offset"] += query_params["limit"]
result = self.client.get(
f"/accounts/{self.config.accountId}/runs/",
data=query_params,
)
run_list_response = DBTRunList.model_validate(result)
runs.extend(run_list_response.Runs)
return runs[:number_of_runs] if number_of_runs is not None else runs
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get run info :{exc}")

View File

@ -105,7 +105,7 @@ class DbtcloudSource(PipelineServiceSource):
)
task_list.append(task)
self.context.get().latest_run_id = (
task_list[-1].name if task_list else None
task_list[0].name if task_list else None
)
return task_list or None
except Exception as exc:

View File

@ -34,8 +34,18 @@ class DBTJob(BaseModel):
project_id: int
class Pagination(BaseModel):
count: int
total_count: int
class Extra(BaseModel):
pagination: Optional[Pagination] = None
class DBTJobList(BaseModel):
Jobs: List[DBTJob] = Field(alias="data")
extra: Optional[Extra] = None
class DBTRun(BaseModel):
@ -51,12 +61,14 @@ class DBTRun(BaseModel):
class DBTRunList(BaseModel):
Runs: Optional[List[DBTRun]] = Field([], alias="data")
extra: Optional[Extra] = None
class DBTSources(BaseModel):
name: Optional[str] = None
dbtschema: Optional[str] = Field(None, alias="schema")
database: Optional[str] = None
extra: Optional[Extra] = None
class DBTModel(BaseModel):
@ -70,3 +82,4 @@ class DBTModel(BaseModel):
class DBTModelList(BaseModel):
models: Optional[List[DBTModel]] = []
seeds: Optional[List[DBTModel]] = []
extra: Optional[Extra] = None

View File

@ -388,6 +388,7 @@ mock_dbtcloud_config = {
"accountId": "70403103922125",
"jobIds": ["70403103922125", "70403103922126"],
"projectIds": ["70403103922127", "70403103922128"],
"numberOfRuns": 10,
"token": "dbt_token",
}
},

View File

@ -55,6 +55,12 @@
"type": "string"
}
},
"numberOfRuns": {
"title": "Number of Runs",
"description": "Number of runs to fetch from DBT cloud",
"type": "integer",
"default": 100
},
"token": {
"title": "Token",
"description": "Generated Token to connect to DBTCloud.",

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* DBTCloud Connection Config
*/
export interface DbtCloudConnection {
@ -32,6 +30,10 @@ export interface DbtCloudConnection {
* List of IDs of your DBT cloud jobs seperated by comma `,`
*/
jobIds?: string[];
/**
* Number of runs to fetch from DBT cloud
*/
numberOfRuns?: number;
/**
* List of IDs of your DBT cloud projects seperated by comma `,`
*/