diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 462444ec0d..1cd5ed8164 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -104,23 +104,20 @@ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS = """ compiledCode """ -_DBT_GRAPHQL_QUERY = f""" -query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ - models(jobId: $jobId, runId: $runId) {{ +_DBT_FIELDS_BY_TYPE = { + "models": f""" { _DBT_GRAPHQL_COMMON_FIELDS } { _DBT_GRAPHQL_NODE_COMMON_FIELDS } { _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS } dependsOn materializedType - }} - - seeds(jobId: $jobId, runId: $runId) {{ +""", + "seeds": f""" { _DBT_GRAPHQL_COMMON_FIELDS } { _DBT_GRAPHQL_NODE_COMMON_FIELDS } { _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS } - }} - - sources(jobId: $jobId, runId: $runId) {{ +""", + "sources": f""" { _DBT_GRAPHQL_COMMON_FIELDS } { _DBT_GRAPHQL_NODE_COMMON_FIELDS } identifier @@ -131,9 +128,8 @@ query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ state freshnessChecked loader - }} - - snapshots(jobId: $jobId, runId: $runId) {{ +""", + "snapshots": f""" { _DBT_GRAPHQL_COMMON_FIELDS } { _DBT_GRAPHQL_NODE_COMMON_FIELDS } { _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS } @@ -143,9 +139,8 @@ query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ parentsModels {{ uniqueId }} - }} - - tests(jobId: $jobId, runId: $runId) {{ +""", + "tests": f""" { _DBT_GRAPHQL_COMMON_FIELDS } state columnName @@ -159,12 +154,18 @@ query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ rawCode compiledSql compiledCode - }} +""", + # Currently unsupported dbt node types: + # - metrics + # - snapshots + # - exposures +} - # Currently unsupported dbt node types: - # - metrics - # - snapshots - # - exposures +_DBT_GRAPHQL_QUERY = """ +query DatahubMetadataQuery_{type}($jobId: Int!, $runId: Int) {{ + {type}(jobId: $jobId, runId: $runId) {{ +{fields} + }} }} """ @@ -206,15 +207,36 @@ class DBTCloudSource(DBTSourceBase): # Additionally, we'd like to model dbt Cloud jobs/runs in DataHub # as DataProcesses or DataJobs. - logger.debug("Sending graphql request to the dbt Cloud metadata API") - response = requests.post( - self.config.metadata_endpoint, - json={ - "query": _DBT_GRAPHQL_QUERY, - "variables": { + raw_nodes = [] + for node_type, fields in _DBT_FIELDS_BY_TYPE.items(): + logger.info(f"Fetching {node_type} from dbt Cloud") + data = self._send_graphql_query( + query=_DBT_GRAPHQL_QUERY.format(type=node_type, fields=fields), + variables={ "jobId": self.config.job_id, "runId": self.config.run_id, }, + ) + + raw_nodes.extend(data[node_type]) + + nodes = [self._parse_into_dbt_node(node) for node in raw_nodes] + + additional_metadata: Dict[str, Optional[str]] = { + "project_id": str(self.config.project_id), + "account_id": str(self.config.account_id), + "job_id": str(self.config.job_id), + } + + return nodes, additional_metadata + + def _send_graphql_query(self, query: str, variables: Dict) -> Dict: + logger.debug(f"Sending GraphQL query to dbt Cloud: {query}") + response = requests.post( + self.config.metadata_endpoint, + json={ + "query": query, + "variables": variables, }, headers={ "Authorization": f"Bearer {self.config.token}", @@ -233,23 +255,7 @@ class DBTCloudSource(DBTSourceBase): response.raise_for_status() raise e - raw_nodes = [ - *data["models"], - *data["seeds"], - *data["sources"], - *data["snapshots"], - *data["tests"], - ] - - nodes = [self._parse_into_dbt_node(node) for node in raw_nodes] - - additional_metadata: Dict[str, Optional[str]] = { - "project_id": str(self.config.project_id), - "account_id": str(self.config.account_id), - "job_id": str(self.config.job_id), - } - - return nodes, additional_metadata + return data def _parse_into_dbt_node(self, node: Dict) -> DBTNode: key = node["uniqueId"]