diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py index c5c356beba3..63731473b72 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -26,13 +26,7 @@ NONE_KEYWORDS_LIST = ["none", "null"] DBT_CATALOG_FILE_NAME = "catalog.json" DBT_MANIFEST_FILE_NAME = "manifest.json" -DBT_RUN_RESULTS_FILE_NAME = "run_results.json" - -DBT_FILE_NAMES_LIST = [ - DBT_CATALOG_FILE_NAME, - DBT_MANIFEST_FILE_NAME, - DBT_RUN_RESULTS_FILE_NAME, -] +DBT_RUN_RESULTS_FILE_NAME = "run_results" class SkipResourceTypeEnum(Enum): diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py index 08afbafba0a..6988448a26f 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py @@ -41,7 +41,6 @@ from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import ( ) from metadata.ingestion.source.database.dbt.constants import ( DBT_CATALOG_FILE_NAME, - DBT_FILE_NAMES_LIST, DBT_MANIFEST_FILE_NAME, DBT_RUN_RESULTS_FILE_NAME, ) @@ -129,7 +128,7 @@ def _(config: DbtHttpConfig): yield DbtFiles( dbt_catalog=dbt_catalog.json() if dbt_catalog else None, dbt_manifest=dbt_manifest.json(), - dbt_run_results=dbt_run_results.json() if dbt_run_results else None, + dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None, ) except DBTConfigException as exc: raise exc @@ -216,7 +215,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals yield DbtFiles( dbt_catalog=dbt_catalog, dbt_manifest=dbt_manifest, - dbt_run_results=dbt_run_results, + dbt_run_results=[dbt_run_results] if dbt_run_results else None, ) except DBTConfigException as exc: raise exc @@ -233,13 +232,12 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]: for blob in blobs: subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else "" blob_file_name = blob.rsplit("/", 1)[1] if "/" in blob else blob - if next( - ( - file_name - for file_name in DBT_FILE_NAMES_LIST - if file_name.lower() == blob_file_name.lower() - ), - None, + # We'll be processing multiple run_result files from a single dir + # Grouping them together to process them in a single go + if ( + DBT_MANIFEST_FILE_NAME == blob_file_name.lower() + or DBT_CATALOG_FILE_NAME == blob_file_name.lower() + or DBT_RUN_RESULTS_FILE_NAME in blob_file_name.lower() ): blob_grouped_by_directory[subdirectory].append(blob) return blob_grouped_by_directory @@ -257,7 +255,7 @@ def download_dbt_files( ) in blob_grouped_by_directory.items(): dbt_catalog = None dbt_manifest = None - dbt_run_results = None + dbt_run_results = [] kwargs = {} if bucket_name: kwargs = {"bucket_name": bucket_name} @@ -278,8 +276,10 @@ def download_dbt_files( ) if DBT_RUN_RESULTS_FILE_NAME in blob: try: - logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found in {key}") - dbt_run_results = reader.read(path=blob, **kwargs) + logger.debug(f"{blob} found in {key}") + dbt_run_result = reader.read(path=blob, **kwargs) + if dbt_run_result: + dbt_run_results.append(json.loads(dbt_run_result)) except Exception as exc: logger.warning( f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}" @@ -289,9 +289,7 @@ def download_dbt_files( yield DbtFiles( dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None, dbt_manifest=json.loads(dbt_manifest), - dbt_run_results=json.loads(dbt_run_results) - if dbt_run_results - else None, + dbt_run_results=dbt_run_results if dbt_run_results else None, ) except DBTConfigException as exc: logger.warning(exc) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 759fbd6808a..409c3e4194a 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -184,9 +184,10 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): if self.context.get().dbt_file.dbt_catalog else None, dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest), - dbt_run_results=parse_run_results( - self.context.get().dbt_file.dbt_run_results - ) + dbt_run_results=[ + parse_run_results(run_result_file) + for run_result_file in self.context.get().dbt_file.dbt_run_results + ] if self.context.get().dbt_file.dbt_run_results else None, ) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 8178c92a968..aba18bc118f 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -1,3 +1,4 @@ +# pylint: disable=too-many-lines # Copyright 2021 Collate # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -319,7 +320,8 @@ class DbtSource(DbtServiceSource): self.context.get().dbt_tests[key][DbtCommonEnum.RESULTS.value] = next( ( item - for item in dbt_objects.dbt_run_results.results + for run_result in dbt_objects.dbt_run_results + for item in run_result.results if item.unique_id == key ), None, @@ -347,12 +349,14 @@ class DbtSource(DbtServiceSource): self.context.get().data_model_links = [] self.context.get().dbt_tests = {} self.context.get().run_results_generate_time = None + # Since we'll be processing multiple run_results for a single project + # we'll only consider the first run_results generated_at time if ( dbt_objects.dbt_run_results - and dbt_objects.dbt_run_results.metadata.generated_at + and dbt_objects.dbt_run_results[0].metadata.generated_at ): self.context.get().run_results_generate_time = ( - dbt_objects.dbt_run_results.metadata.generated_at + dbt_objects.dbt_run_results[0].metadata.generated_at ) for key, manifest_node in manifest_entities.items(): try: diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/models.py b/ingestion/src/metadata/ingestion/source/database/dbt/models.py index 8054345c8bf..88671141d43 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/models.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/models.py @@ -20,13 +20,13 @@ from pydantic import BaseModel class DbtFiles(BaseModel): dbt_catalog: Optional[dict] = None dbt_manifest: dict - dbt_run_results: Optional[dict] = None + dbt_run_results: Optional[List[dict]] = None class DbtObjects(BaseModel): dbt_catalog: Optional[Any] = None dbt_manifest: Any - dbt_run_results: Optional[Any] = None + dbt_run_results: Optional[List[Any]] = None class DbtFilteredModel(BaseModel): diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index 657bcf1408f..9207b6437f2 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -508,7 +508,7 @@ class DbtUnitTest(TestCase): if dbt_files.dbt_catalog else None, dbt_manifest=parse_manifest(dbt_files.dbt_manifest), - dbt_run_results=parse_run_results(dbt_files.dbt_run_results) + dbt_run_results=[parse_run_results(dbt_files.dbt_run_results)] if dbt_files.dbt_run_results else None, ) diff --git a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/workflows/dbt/setup-multiple-dbt-projects.md b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/workflows/dbt/setup-multiple-dbt-projects.md index ff4c8fc6cfd..4f561d9c3bf 100644 --- a/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/workflows/dbt/setup-multiple-dbt-projects.md +++ b/openmetadata-docs/content/v1.5.x-SNAPSHOT/connectors/ingestion/workflows/dbt/setup-multiple-dbt-projects.md @@ -13,7 +13,11 @@ This functionality is supported for s3, GCS, and Azure configurations only. {% /note %} -To ensure the workflow operates smoothly, organize the dbt files for each project into separate directories and name the files `manifest.json`, `catalog.json`, and `run_results.json`. The workflow will scan through the specified prefix path in the designated bucket, traversing each folder to locate these dbt files. +To ensure the workflow operates smoothly, organize the dbt files for each project into separate directories and name the files `manifest.json`, `catalog.json`, and `run_results.json`. + +If your dbt tests are split across multiple run_results.json files, place these files in the same directory as their corresponding manifest.json file. Ensure that each file retains `run_results` in its name, and append a unique suffix as needed. For example: run_results_one.json, run_results_two.json, run_results_three.json + +The workflow will scan through the specified prefix path in the designated bucket, traversing each folder to locate these dbt files. The dbt workflow will scan through the provided prefix path in the specified bucket and go through each folder to find the dbt files. @@ -38,7 +42,9 @@ bucket_home/ ├── dbt_project_two/ │ ├── manifest.json │ ├── catalog.json - │ └── run_results.json + │ └── run_results_one.json + | └── run_results_two.json + | └── run_results_three.json └── dbt_project_three/ ├── manifest.json ├── catalog.json