Added dbt cloud multi projects and jobs filter (#18801)

* Added dbt cloud multi project and jobs filter

* added tests

* change to array type

* updated yaml config

* added migrations
This commit is contained in:
Suman Maharana 2024-11-28 16:10:34 +05:30 committed by GitHub
parent b9e49fc35d
commit 9a21e77e15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 173 additions and 47 deletions

View File

@ -1747,3 +1747,29 @@ WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
UPDATE ingestion_pipeline_entity UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.processPiiSensitive', '$.sourceConfig.config.confidence', '$.sourceConfig.config.generateSampleData') SET json = JSON_REMOVE(json, '$.sourceConfig.config.processPiiSensitive', '$.sourceConfig.config.confidence', '$.sourceConfig.config.generateSampleData')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'profiler'; WHERE JSON_EXTRACT(json, '$.pipelineType') = 'profiler';
-- Rename 'jobId' to 'jobIds', set 'jobId' as type array in 'jobIds' , add 'projectIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = JSON_SET(
JSON_REMOVE(
json,
'$.connection.config.jobId'
),
'$.connection.config.jobIds',
IF(
JSON_CONTAINS_PATH(json, 'one', '$.connection.config.jobIds'),
JSON_EXTRACT(json, '$.connection.config.jobIds'),
IF(
JSON_EXTRACT(json, '$.connection.config.jobId') IS NOT NULL,
JSON_ARRAY(JSON_UNQUOTE(JSON_EXTRACT(json, '$.connection.config.jobId'))),
JSON_ARRAY()
)
),
'$.connection.config.projectIds',
IF(
JSON_CONTAINS_PATH(json, 'one', '$.connection.config.projectIds'),
JSON_EXTRACT(json, '$.connection.config.projectIds'),
JSON_ARRAY()
)
)
WHERE serviceType = 'DBTCloud';

View File

@ -1734,3 +1734,25 @@ WHERE json #>> '{pipelineType}' = 'metadata';
UPDATE ingestion_pipeline_entity UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,processPiiSensitive}' #- '{sourceConfig,config,confidence}' #- '{sourceConfig,config,generateSampleData}' SET json = json::jsonb #- '{sourceConfig,config,processPiiSensitive}' #- '{sourceConfig,config,confidence}' #- '{sourceConfig,config,generateSampleData}'
WHERE json #>> '{pipelineType}' = 'profiler'; WHERE json #>> '{pipelineType}' = 'profiler';
-- set value of 'jobId' as an array into 'jobIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = (case when json#>>'{connection, config, jobId}' IS NOT null
then
jsonb_set(json, '{connection, config, jobIds}', to_jsonb(ARRAY[json#>>'{connection, config, jobId}']), true)
else
jsonb_set(json, '{connection, config, jobIds}', '[]', true)
end
)
WHERE servicetype = 'DBTCloud';
-- remove 'jobId' after setting 'jobIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = json::jsonb #- '{connection,config,jobId}'
WHERE json#>>'{connection, config, jobId}' IS NOT null
and servicetype = 'DBTCloud';
-- add 'projectIds' for dbt cloud
UPDATE pipeline_service_entity
SET json = jsonb_set(json, '{connection, config, projectIds}', '[]', true)
WHERE servicetype = 'DBTCloud';

View File

@ -7,7 +7,8 @@ source:
host: https://account_prefix.account_region.dbt.com host: https://account_prefix.account_region.dbt.com
discoveryAPI: https://metadata.cloud.getdbt.com/graphql discoveryAPI: https://metadata.cloud.getdbt.com/graphql
accountId: "numeric_account_id" accountId: "numeric_account_id"
# jobId: "numeric_job_id" # jobIds: ["job_id_1", "job_id_2", "job_id_3"]
# projectIds: ["project_id_1", "project_id_2", "project_id_3"]
token: auth_token token: auth_token
sourceConfig: sourceConfig:
config: config:

View File

@ -35,6 +35,7 @@ from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ometa_logger from metadata.utils.logger import ometa_logger
logger = ometa_logger() logger = ometa_logger()
API_VERSION = "api/v2"
class DBTCloudClient: class DBTCloudClient:
@ -44,9 +45,13 @@ class DBTCloudClient:
def __init__(self, config: DBTCloudConnection): def __init__(self, config: DBTCloudConnection):
self.config = config self.config = config
self.job_ids = self.config.jobIds
self.project_ids = self.config.projectIds
client_config: ClientConfig = ClientConfig( client_config: ClientConfig = ClientConfig(
base_url=clean_uri(self.config.host), base_url=clean_uri(self.config.host),
api_version="api/v2", api_version=API_VERSION,
auth_header=AUTHORIZATION_HEADER, auth_header=AUTHORIZATION_HEADER,
auth_token=lambda: (self.config.token.get_secret_value(), 0), auth_token=lambda: (self.config.token.get_secret_value(), 0),
allow_redirects=True, allow_redirects=True,
@ -63,27 +68,43 @@ class DBTCloudClient:
self.client = REST(client_config) self.client = REST(client_config)
self.graphql_client = REST(graphql_client_config) self.graphql_client = REST(graphql_client_config)
def test_get_jobs(self) -> Optional[List[DBTJob]]: def _get_jobs(
self, job_id: str = None, project_id: str = None
) -> Optional[List[DBTJob]]:
""" """
test fetch jobs for an account in dbt cloud fetch jobs for an account in dbt cloud
""" """
job_path = f"{self.config.jobId}/" if self.config.jobId else "" job_list = []
result = self.client.get(f"/accounts/{self.config.accountId}/jobs/{job_path}") try:
job_path = f"{job_id}/" if job_id else ""
project_path = f"?project_id={project_id}" if project_id else ""
result = self.client.get(
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}"
)
job_list = ( job_list = (
[DBTJob.model_validate(result["data"])] [DBTJob.model_validate(result["data"])]
if self.config.jobId if job_id
else DBTJobList.model_validate(result).Jobs else DBTJobList.model_validate(result).Jobs
) )
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Failed to get job info for project_id: `{project_id}` or job_id: `{job_id}` : {exc}"
)
return job_list return job_list
def test_get_runs(self, job_id: int) -> Optional[List[DBTRun]]: def test_get_jobs(self) -> List[DBTJob]:
"""
test fetch jobs for an account in dbt cloud
"""
job_list = self.client.get(f"/accounts/{self.config.accountId}/jobs/")
return DBTJobList.model_validate(job_list).Jobs
def test_get_runs(self) -> List[DBTRun]:
""" """
test fetch runs for a job in dbt cloud test fetch runs for a job in dbt cloud
""" """
result = self.client.get( result = self.client.get(f"/accounts/{self.config.accountId}/runs/")
f"/accounts/{self.config.accountId}/runs/",
data={"job_definition_id": job_id},
)
run_list = DBTRunList.model_validate(result).Runs run_list = DBTRunList.model_validate(result).Runs
return run_list return run_list
@ -92,17 +113,29 @@ class DBTCloudClient:
list jobs for an account in dbt cloud list jobs for an account in dbt cloud
""" """
try: try:
job_path = f"{self.config.jobId}/" if self.config.jobId else "" jobs = []
result = self.client.get( # case when job_ids are specified and project_ids are not
f"/accounts/{self.config.accountId}/jobs/{job_path}" if self.job_ids and not self.project_ids:
for job_id in self.job_ids:
jobs.extend(self._get_jobs(job_id=job_id))
# case when project_ids are specified or both are specified
elif self.project_ids:
for project_id in self.project_ids:
results = self._get_jobs(project_id=project_id)
if self.job_ids:
jobs.extend(
[
result
for result in results
if str(result.id) in self.job_ids
]
) )
if result: else:
job_list = ( jobs.extend(results)
[DBTJob.model_validate(result.get("data"))] else:
if self.config.jobId results = self._get_jobs()
else DBTJobList.model_validate(result).Jobs jobs.extend(results)
) return jobs
return job_list
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(f"Unable to get job info :{exc}") logger.error(f"Unable to get job info :{exc}")
@ -141,6 +174,9 @@ class DBTCloudClient:
if result.get("data") and result["data"].get("job"): if result.get("data") and result["data"].get("job"):
model_list = DBTModelList.model_validate(result["data"]["job"]).models model_list = DBTModelList.model_validate(result["data"]["job"]).models
logger.debug(
f"Successfully fetched models from dbt for job_id:{job_id} run_id:{run_id}: {model_list}"
)
return model_list return model_list
except Exception as exc: except Exception as exc:
@ -150,7 +186,7 @@ class DBTCloudClient:
def get_models_and_seeds_details(self, job_id: int, run_id: int): def get_models_and_seeds_details(self, job_id: int, run_id: int):
""" """
get model details for a job in dbt cloud for lineage get parent model details for a job in dbt cloud for lineage
""" """
try: try:
query_params = { query_params = {
@ -163,9 +199,12 @@ class DBTCloudClient:
if result.get("data") and result["data"].get("job"): if result.get("data") and result["data"].get("job"):
result = DBTModelList.model_validate(result["data"]["job"]) result = DBTModelList.model_validate(result["data"]["job"])
parents_list = result.models + result.seeds parents_list = result.models + result.seeds
logger.debug(
f"Successfully fetched parent models from dbt for job_id:{job_id} run_id:{run_id}: {parents_list}"
)
return parents_list return parents_list
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Unable to get model info :{exc}") logger.warning(f"Unable to get parents model info :{exc}")
return None return None

View File

@ -50,11 +50,9 @@ def test_connection(
of a metadata workflow or during an Automation Workflow of a metadata workflow or during an Automation Workflow
""" """
job_id = int(service_connection.jobId) if service_connection.jobId else 0
test_fn = { test_fn = {
"GetJobs": client.test_get_jobs, "GetJobs": client.test_get_jobs,
"GetRuns": partial(client.test_get_runs, job_id=job_id), "GetRuns": partial(client.test_get_runs),
} }
return test_connection_steps( return test_connection_steps(

View File

@ -35,7 +35,7 @@ class DBTJob(BaseModel):
class DBTJobList(BaseModel): class DBTJobList(BaseModel):
Jobs: Optional[List[DBTJob]] = Field([], alias="data") Jobs: List[DBTJob] = Field(alias="data")
class DBTRun(BaseModel): class DBTRun(BaseModel):

View File

@ -386,7 +386,8 @@ mock_dbtcloud_config = {
"host": "https://abc12.us1.dbt.com", "host": "https://abc12.us1.dbt.com",
"discoveryAPI": "https://metadata.cloud.getdbt.com/graphql", "discoveryAPI": "https://metadata.cloud.getdbt.com/graphql",
"accountId": "70403103922125", "accountId": "70403103922125",
"jobId": "70403103922125", "jobIds": ["70403103922125", "70403103922126"],
"projectIds": ["70403103922127", "70403103922128"],
"token": "dbt_token", "token": "dbt_token",
} }
}, },
@ -510,6 +511,10 @@ MOCK_PIPELINE = Pipeline(
sourceHash=None, sourceHash=None,
) )
EXPECTED_JOB_FILTERS = ["70403103922125", "70403103922126"]
EXPECTED_PROJECT_FILTERS = ["70403103922127", "70403103922128"]
EXPECTED_PIPELINE_NAME = str(MOCK_JOB_RESULT["data"][0]["name"]) EXPECTED_PIPELINE_NAME = str(MOCK_JOB_RESULT["data"][0]["name"])
@ -547,6 +552,10 @@ class DBTCloudUnitTest(TestCase):
== EXPECTED_PIPELINE_NAME == EXPECTED_PIPELINE_NAME
) )
def test_filters_to_list(self):
assert self.dbtcloud.client.job_ids == EXPECTED_JOB_FILTERS
assert self.dbtcloud.client.project_ids == EXPECTED_PROJECT_FILTERS
def test_pipelines(self): def test_pipelines(self):
pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right
assert pipeline == EXPECTED_CREATED_PIPELINES assert pipeline == EXPECTED_CREATED_PIPELINES

View File

@ -69,7 +69,11 @@ To know more about permissions required refer [here](https://docs.getdbt.com/doc
- **Account Id** : The Account ID of your DBT cloud Project. Go to your dbt cloud account settings to know your Account Id. This will be a numeric value but in openmetadata we parse it as a string. - **Account Id** : The Account ID of your DBT cloud Project. Go to your dbt cloud account settings to know your Account Id. This will be a numeric value but in openmetadata we parse it as a string.
- **Job Id** : Optional. The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. - **Job Ids** : Optional. Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested.
- **Project Ids** : Optional. Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account id will be ingested.
Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out.
- **Token** : The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication). - **Token** : The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication).
Make sure you have the necessary permissions on the token to run graphql queries and get job and run details. Make sure you have the necessary permissions on the token to run graphql queries and get job and run details.

View File

@ -70,12 +70,20 @@ This is a sample config for dbt Cloud:
{% codeInfo srNumber=4 %} {% codeInfo srNumber=4 %}
**jobId**: Optional. The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. **jobIds**: Optional. Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested.
{% /codeInfo %} {% /codeInfo %}
{% codeInfo srNumber=5 %} {% codeInfo srNumber=5 %}
**projectIds**: Optional. Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account id will be ingested.
Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out.
{% /codeInfo %}
{% codeInfo srNumber=6 %}
**token**: The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication). **token**: The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication).
Make sure you have the necessary permissions on the token to run graphql queries and get job and run details. Make sure you have the necessary permissions on the token to run graphql queries and get job and run details.
@ -111,9 +119,12 @@ source:
accountId: "numeric_account_id" accountId: "numeric_account_id"
``` ```
```yaml {% srNumber=4 %} ```yaml {% srNumber=4 %}
# jobId: "numeric_job_id" # jobIds: ["job_id_1", "job_id_2", "job_id_3"]
``` ```
```yaml {% srNumber=5 %} ```yaml {% srNumber=5 %}
# projectIds: ["project_id_1", "project_id_2", "project_id_3"]
```
```yaml {% srNumber=6 %}
token: auth_token token: auth_token
``` ```

View File

@ -39,11 +39,21 @@
"description": "ID of your DBT cloud account", "description": "ID of your DBT cloud account",
"type": "string" "type": "string"
}, },
"jobId": { "jobIds": {
"title": "Job Id", "title": "Job Ids",
"description": "ID of your DBT cloud job", "description": "List of IDs of your DBT cloud jobs seperated by comma `,`",
"type": "string", "type": "array",
"default": null "items": {
"type": "string"
}
},
"projectIds": {
"title": "Project Ids",
"description": "List of IDs of your DBT cloud projects seperated by comma `,`",
"type": "array",
"items": {
"type": "string"
}
}, },
"token": { "token": {
"title": "Token", "title": "Token",
@ -55,4 +65,3 @@
"additionalProperties": false, "additionalProperties": false,
"required": ["host", "discoveryAPI", "accountId", "token"] "required": ["host", "discoveryAPI", "accountId", "token"]
} }

View File

@ -26,8 +26,15 @@ The Account ID of your DBT cloud Project. Go to your dbt cloud account settings
$$ $$
$$section $$section
### Job Id $(id="jobId") ### Job Ids $(id="jobIds")
The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. `Optional` Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account Id will be ingested. `Optional`
$$
$$section
### Project Ids $(id="projectIds")
Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account Id will be ingested. `Optional`
Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out.
$$ $$
$$section $$section