From 9a21e77e1587380bd828c5ddd03f6ee337fd8d6a Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Thu, 28 Nov 2024 16:10:34 +0530 Subject: [PATCH] Added dbt cloud multi projects and jobs filter (#18801) * Added dbt cloud multi project and jobs filter * added tests * change to array type * updated yaml config * added migrations --- .../native/1.6.0/mysql/schemaChanges.sql | 26 +++++ .../native/1.6.0/postgres/schemaChanges.sql | 24 ++++- .../metadata/examples/workflows/dbtcloud.yaml | 3 +- .../source/pipeline/dbtcloud/client.py | 95 +++++++++++++------ .../source/pipeline/dbtcloud/connection.py | 4 +- .../source/pipeline/dbtcloud/models.py | 2 +- .../unit/topology/pipeline/test_dbtcloud.py | 11 ++- .../connectors/pipeline/dbtcloud/index.md | 6 +- .../connectors/pipeline/dbtcloud/yaml.md | 15 ++- .../pipeline/dbtCloudConnection.json | 23 +++-- .../public/locales/en-US/Pipeline/DBTCloud.md | 11 ++- 11 files changed, 173 insertions(+), 47 deletions(-) diff --git a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql index b033d86c816..9203128c0b2 100644 --- a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql @@ -1747,3 +1747,29 @@ WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata'; UPDATE ingestion_pipeline_entity SET json = JSON_REMOVE(json, '$.sourceConfig.config.processPiiSensitive', '$.sourceConfig.config.confidence', '$.sourceConfig.config.generateSampleData') WHERE JSON_EXTRACT(json, '$.pipelineType') = 'profiler'; + +-- Rename 'jobId' to 'jobIds', set 'jobId' as type array in 'jobIds' , add 'projectIds' for dbt cloud +UPDATE pipeline_service_entity +SET json = JSON_SET( + JSON_REMOVE( + json, + '$.connection.config.jobId' + ), + '$.connection.config.jobIds', + IF( + JSON_CONTAINS_PATH(json, 'one', '$.connection.config.jobIds'), + JSON_EXTRACT(json, '$.connection.config.jobIds'), + IF( + JSON_EXTRACT(json, '$.connection.config.jobId') IS NOT NULL, + JSON_ARRAY(JSON_UNQUOTE(JSON_EXTRACT(json, '$.connection.config.jobId'))), + JSON_ARRAY() + ) + ), + '$.connection.config.projectIds', + IF( + JSON_CONTAINS_PATH(json, 'one', '$.connection.config.projectIds'), + JSON_EXTRACT(json, '$.connection.config.projectIds'), + JSON_ARRAY() + ) +) +WHERE serviceType = 'DBTCloud'; diff --git a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql index c5711a31741..38fb01676a2 100644 --- a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql @@ -1733,4 +1733,26 @@ WHERE json #>> '{pipelineType}' = 'metadata'; -- classification and sampling configs from the profiler pipelines UPDATE ingestion_pipeline_entity SET json = json::jsonb #- '{sourceConfig,config,processPiiSensitive}' #- '{sourceConfig,config,confidence}' #- '{sourceConfig,config,generateSampleData}' -WHERE json #>> '{pipelineType}' = 'profiler'; \ No newline at end of file +WHERE json #>> '{pipelineType}' = 'profiler'; + +-- set value of 'jobId' as an array into 'jobIds' for dbt cloud +UPDATE pipeline_service_entity +SET json = (case when json#>>'{connection, config, jobId}' IS NOT null +then + jsonb_set(json, '{connection, config, jobIds}', to_jsonb(ARRAY[json#>>'{connection, config, jobId}']), true) +else + jsonb_set(json, '{connection, config, jobIds}', '[]', true) +end +) +WHERE servicetype = 'DBTCloud'; + +-- remove 'jobId' after setting 'jobIds' for dbt cloud +UPDATE pipeline_service_entity +SET json = json::jsonb #- '{connection,config,jobId}' +WHERE json#>>'{connection, config, jobId}' IS NOT null +and servicetype = 'DBTCloud'; + +-- add 'projectIds' for dbt cloud +UPDATE pipeline_service_entity +SET json = jsonb_set(json, '{connection, config, projectIds}', '[]', true) +WHERE servicetype = 'DBTCloud'; diff --git a/ingestion/src/metadata/examples/workflows/dbtcloud.yaml b/ingestion/src/metadata/examples/workflows/dbtcloud.yaml index 9ac76d2b980..5bc25d6d59a 100644 --- a/ingestion/src/metadata/examples/workflows/dbtcloud.yaml +++ b/ingestion/src/metadata/examples/workflows/dbtcloud.yaml @@ -7,7 +7,8 @@ source: host: https://account_prefix.account_region.dbt.com discoveryAPI: https://metadata.cloud.getdbt.com/graphql accountId: "numeric_account_id" - # jobId: "numeric_job_id" + # jobIds: ["job_id_1", "job_id_2", "job_id_3"] + # projectIds: ["project_id_1", "project_id_2", "project_id_3"] token: auth_token sourceConfig: config: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py index 2cd1d21279d..3d1ddf9b270 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py @@ -35,6 +35,7 @@ from metadata.utils.helpers import clean_uri from metadata.utils.logger import ometa_logger logger = ometa_logger() +API_VERSION = "api/v2" class DBTCloudClient: @@ -44,9 +45,13 @@ class DBTCloudClient: def __init__(self, config: DBTCloudConnection): self.config = config + + self.job_ids = self.config.jobIds + self.project_ids = self.config.projectIds + client_config: ClientConfig = ClientConfig( base_url=clean_uri(self.config.host), - api_version="api/v2", + api_version=API_VERSION, auth_header=AUTHORIZATION_HEADER, auth_token=lambda: (self.config.token.get_secret_value(), 0), allow_redirects=True, @@ -63,27 +68,43 @@ class DBTCloudClient: self.client = REST(client_config) self.graphql_client = REST(graphql_client_config) - def test_get_jobs(self) -> Optional[List[DBTJob]]: + def _get_jobs( + self, job_id: str = None, project_id: str = None + ) -> Optional[List[DBTJob]]: + """ + fetch jobs for an account in dbt cloud + """ + job_list = [] + try: + job_path = f"{job_id}/" if job_id else "" + project_path = f"?project_id={project_id}" if project_id else "" + result = self.client.get( + f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}" + ) + job_list = ( + [DBTJob.model_validate(result["data"])] + if job_id + else DBTJobList.model_validate(result).Jobs + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Failed to get job info for project_id: `{project_id}` or job_id: `{job_id}` : {exc}" + ) + return job_list + + def test_get_jobs(self) -> List[DBTJob]: """ test fetch jobs for an account in dbt cloud """ - job_path = f"{self.config.jobId}/" if self.config.jobId else "" - result = self.client.get(f"/accounts/{self.config.accountId}/jobs/{job_path}") - job_list = ( - [DBTJob.model_validate(result["data"])] - if self.config.jobId - else DBTJobList.model_validate(result).Jobs - ) - return job_list + job_list = self.client.get(f"/accounts/{self.config.accountId}/jobs/") + return DBTJobList.model_validate(job_list).Jobs - def test_get_runs(self, job_id: int) -> Optional[List[DBTRun]]: + def test_get_runs(self) -> List[DBTRun]: """ test fetch runs for a job in dbt cloud """ - result = self.client.get( - f"/accounts/{self.config.accountId}/runs/", - data={"job_definition_id": job_id}, - ) + result = self.client.get(f"/accounts/{self.config.accountId}/runs/") run_list = DBTRunList.model_validate(result).Runs return run_list @@ -92,17 +113,29 @@ class DBTCloudClient: list jobs for an account in dbt cloud """ try: - job_path = f"{self.config.jobId}/" if self.config.jobId else "" - result = self.client.get( - f"/accounts/{self.config.accountId}/jobs/{job_path}" - ) - if result: - job_list = ( - [DBTJob.model_validate(result.get("data"))] - if self.config.jobId - else DBTJobList.model_validate(result).Jobs - ) - return job_list + jobs = [] + # case when job_ids are specified and project_ids are not + if self.job_ids and not self.project_ids: + for job_id in self.job_ids: + jobs.extend(self._get_jobs(job_id=job_id)) + # case when project_ids are specified or both are specified + elif self.project_ids: + for project_id in self.project_ids: + results = self._get_jobs(project_id=project_id) + if self.job_ids: + jobs.extend( + [ + result + for result in results + if str(result.id) in self.job_ids + ] + ) + else: + jobs.extend(results) + else: + results = self._get_jobs() + jobs.extend(results) + return jobs except Exception as exc: logger.debug(traceback.format_exc()) logger.error(f"Unable to get job info :{exc}") @@ -141,6 +174,9 @@ class DBTCloudClient: if result.get("data") and result["data"].get("job"): model_list = DBTModelList.model_validate(result["data"]["job"]).models + logger.debug( + f"Successfully fetched models from dbt for job_id:{job_id} run_id:{run_id}: {model_list}" + ) return model_list except Exception as exc: @@ -150,7 +186,7 @@ class DBTCloudClient: def get_models_and_seeds_details(self, job_id: int, run_id: int): """ - get model details for a job in dbt cloud for lineage + get parent model details for a job in dbt cloud for lineage """ try: query_params = { @@ -163,9 +199,12 @@ class DBTCloudClient: if result.get("data") and result["data"].get("job"): result = DBTModelList.model_validate(result["data"]["job"]) parents_list = result.models + result.seeds + logger.debug( + f"Successfully fetched parent models from dbt for job_id:{job_id} run_id:{run_id}: {parents_list}" + ) return parents_list except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Unable to get model info :{exc}") + logger.warning(f"Unable to get parents model info :{exc}") return None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py index 40d73ce7d78..ee54d3444d3 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py @@ -50,11 +50,9 @@ def test_connection( of a metadata workflow or during an Automation Workflow """ - job_id = int(service_connection.jobId) if service_connection.jobId else 0 - test_fn = { "GetJobs": client.test_get_jobs, - "GetRuns": partial(client.test_get_runs, job_id=job_id), + "GetRuns": partial(client.test_get_runs), } return test_connection_steps( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py index 6273deacd92..b9deebc2d5c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/models.py @@ -35,7 +35,7 @@ class DBTJob(BaseModel): class DBTJobList(BaseModel): - Jobs: Optional[List[DBTJob]] = Field([], alias="data") + Jobs: List[DBTJob] = Field(alias="data") class DBTRun(BaseModel): diff --git a/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py b/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py index e8a99344fbe..e1cfead56a6 100644 --- a/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py +++ b/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py @@ -386,7 +386,8 @@ mock_dbtcloud_config = { "host": "https://abc12.us1.dbt.com", "discoveryAPI": "https://metadata.cloud.getdbt.com/graphql", "accountId": "70403103922125", - "jobId": "70403103922125", + "jobIds": ["70403103922125", "70403103922126"], + "projectIds": ["70403103922127", "70403103922128"], "token": "dbt_token", } }, @@ -510,6 +511,10 @@ MOCK_PIPELINE = Pipeline( sourceHash=None, ) +EXPECTED_JOB_FILTERS = ["70403103922125", "70403103922126"] + +EXPECTED_PROJECT_FILTERS = ["70403103922127", "70403103922128"] + EXPECTED_PIPELINE_NAME = str(MOCK_JOB_RESULT["data"][0]["name"]) @@ -547,6 +552,10 @@ class DBTCloudUnitTest(TestCase): == EXPECTED_PIPELINE_NAME ) + def test_filters_to_list(self): + assert self.dbtcloud.client.job_ids == EXPECTED_JOB_FILTERS + assert self.dbtcloud.client.project_ids == EXPECTED_PROJECT_FILTERS + def test_pipelines(self): pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right assert pipeline == EXPECTED_CREATED_PIPELINES diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/index.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/index.md index a332fe24f4e..fa40313f8d3 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/index.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/index.md @@ -69,7 +69,11 @@ To know more about permissions required refer [here](https://docs.getdbt.com/doc - **Account Id** : The Account ID of your DBT cloud Project. Go to your dbt cloud account settings to know your Account Id. This will be a numeric value but in openmetadata we parse it as a string. -- **Job Id** : Optional. The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. +- **Job Ids** : Optional. Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. + +- **Project Ids** : Optional. Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account id will be ingested. + +Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out. - **Token** : The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication). Make sure you have the necessary permissions on the token to run graphql queries and get job and run details. diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/yaml.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/yaml.md index d4846048abd..1a9d52f0429 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/yaml.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/pipeline/dbtcloud/yaml.md @@ -70,12 +70,20 @@ This is a sample config for dbt Cloud: {% codeInfo srNumber=4 %} -**jobId**: Optional. The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. +**jobIds**: Optional. Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. {% /codeInfo %} {% codeInfo srNumber=5 %} +**projectIds**: Optional. Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account id will be ingested. + +Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out. + +{% /codeInfo %} + +{% codeInfo srNumber=6 %} + **token**: The Authentication Token of your DBT cloud API Account. To get your access token you can follow the docs [here](https://docs.getdbt.com/docs/dbt-cloud-apis/authentication). Make sure you have the necessary permissions on the token to run graphql queries and get job and run details. @@ -111,9 +119,12 @@ source: accountId: "numeric_account_id" ``` ```yaml {% srNumber=4 %} - # jobId: "numeric_job_id" + # jobIds: ["job_id_1", "job_id_2", "job_id_3"] ``` ```yaml {% srNumber=5 %} + # projectIds: ["project_id_1", "project_id_2", "project_id_3"] +``` +```yaml {% srNumber=6 %} token: auth_token ``` diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/dbtCloudConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/dbtCloudConnection.json index 13b41b6b483..e9979dfe5c9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/dbtCloudConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/dbtCloudConnection.json @@ -39,11 +39,21 @@ "description": "ID of your DBT cloud account", "type": "string" }, - "jobId": { - "title": "Job Id", - "description": "ID of your DBT cloud job", - "type": "string", - "default": null + "jobIds": { + "title": "Job Ids", + "description": "List of IDs of your DBT cloud jobs seperated by comma `,`", + "type": "array", + "items": { + "type": "string" + } + }, + "projectIds": { + "title": "Project Ids", + "description": "List of IDs of your DBT cloud projects seperated by comma `,`", + "type": "array", + "items": { + "type": "string" + } }, "token": { "title": "Token", @@ -54,5 +64,4 @@ }, "additionalProperties": false, "required": ["host", "discoveryAPI", "accountId", "token"] - } - \ No newline at end of file + } \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DBTCloud.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DBTCloud.md index 56d0186704b..e7965a34e93 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DBTCloud.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DBTCloud.md @@ -26,8 +26,15 @@ The Account ID of your DBT cloud Project. Go to your dbt cloud account settings $$ $$section -### Job Id $(id="jobId") -The Job ID of your DBT cloud Job in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account id will be ingested. `Optional` +### Job Ids $(id="jobIds") +Job IDs of your DBT cloud Jobs in your Project to fetch metadata for. Look for the segment after "jobs" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `73659994`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Jobs under the Account Id will be ingested. `Optional` +$$ + +$$section +### Project Ids $(id="projectIds") +Project IDs of your DBT cloud Account to fetch metadata for. Look for the segment after "projects" in the URL. For instance, in a URL like `https://cloud.getdbt.com/accounts/123/projects/87477/jobs/73659994`, the job ID is `87477`. This will be a numeric value but in openmetadata we parse it as a string. If not passed all Projects under the Account Id will be ingested. `Optional` + +Note that if both `Job Ids` and `Project Ids` are passed then it will filter out the jobs from the passed projects. any `Job Ids` not belonging to the `Project Ids` will also be filtered out. $$ $$section