MINOR: Added support to process multiple dbt run_results.json for a single dbt project (#17412)

* Added dbt multiple run_results

* correct to suffix

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Onkar Ravgan 2024-08-13 13:19:56 +05:30 committed by GitHub
parent d808f3f385
commit 1bc0ca7155
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 37 additions and 34 deletions

View File

@ -26,13 +26,7 @@ NONE_KEYWORDS_LIST = ["none", "null"]
DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results.json"
DBT_FILE_NAMES_LIST = [
DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
]
DBT_RUN_RESULTS_FILE_NAME = "run_results"
class SkipResourceTypeEnum(Enum):

View File

@ -41,7 +41,6 @@ from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import (
)
from metadata.ingestion.source.database.dbt.constants import (
DBT_CATALOG_FILE_NAME,
DBT_FILE_NAMES_LIST,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
)
@ -129,7 +128,7 @@ def _(config: DbtHttpConfig):
yield DbtFiles(
dbt_catalog=dbt_catalog.json() if dbt_catalog else None,
dbt_manifest=dbt_manifest.json(),
dbt_run_results=dbt_run_results.json() if dbt_run_results else None,
dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None,
)
except DBTConfigException as exc:
raise exc
@ -216,7 +215,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
yield DbtFiles(
dbt_catalog=dbt_catalog,
dbt_manifest=dbt_manifest,
dbt_run_results=dbt_run_results,
dbt_run_results=[dbt_run_results] if dbt_run_results else None,
)
except DBTConfigException as exc:
raise exc
@ -233,13 +232,12 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
for blob in blobs:
subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else ""
blob_file_name = blob.rsplit("/", 1)[1] if "/" in blob else blob
if next(
(
file_name
for file_name in DBT_FILE_NAMES_LIST
if file_name.lower() == blob_file_name.lower()
),
None,
# We'll be processing multiple run_result files from a single dir
# Grouping them together to process them in a single go
if (
DBT_MANIFEST_FILE_NAME == blob_file_name.lower()
or DBT_CATALOG_FILE_NAME == blob_file_name.lower()
or DBT_RUN_RESULTS_FILE_NAME in blob_file_name.lower()
):
blob_grouped_by_directory[subdirectory].append(blob)
return blob_grouped_by_directory
@ -257,7 +255,7 @@ def download_dbt_files(
) in blob_grouped_by_directory.items():
dbt_catalog = None
dbt_manifest = None
dbt_run_results = None
dbt_run_results = []
kwargs = {}
if bucket_name:
kwargs = {"bucket_name": bucket_name}
@ -278,8 +276,10 @@ def download_dbt_files(
)
if DBT_RUN_RESULTS_FILE_NAME in blob:
try:
logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found in {key}")
dbt_run_results = reader.read(path=blob, **kwargs)
logger.debug(f"{blob} found in {key}")
dbt_run_result = reader.read(path=blob, **kwargs)
if dbt_run_result:
dbt_run_results.append(json.loads(dbt_run_result))
except Exception as exc:
logger.warning(
f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}"
@ -289,9 +289,7 @@ def download_dbt_files(
yield DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=json.loads(dbt_run_results)
if dbt_run_results
else None,
dbt_run_results=dbt_run_results if dbt_run_results else None,
)
except DBTConfigException as exc:
logger.warning(exc)

View File

@ -184,9 +184,10 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
if self.context.get().dbt_file.dbt_catalog
else None,
dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest),
dbt_run_results=parse_run_results(
self.context.get().dbt_file.dbt_run_results
)
dbt_run_results=[
parse_run_results(run_result_file)
for run_result_file in self.context.get().dbt_file.dbt_run_results
]
if self.context.get().dbt_file.dbt_run_results
else None,
)

View File

@ -1,3 +1,4 @@
# pylint: disable=too-many-lines
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -319,7 +320,8 @@ class DbtSource(DbtServiceSource):
self.context.get().dbt_tests[key][DbtCommonEnum.RESULTS.value] = next(
(
item
for item in dbt_objects.dbt_run_results.results
for run_result in dbt_objects.dbt_run_results
for item in run_result.results
if item.unique_id == key
),
None,
@ -347,12 +349,14 @@ class DbtSource(DbtServiceSource):
self.context.get().data_model_links = []
self.context.get().dbt_tests = {}
self.context.get().run_results_generate_time = None
# Since we'll be processing multiple run_results for a single project
# we'll only consider the first run_results generated_at time
if (
dbt_objects.dbt_run_results
and dbt_objects.dbt_run_results.metadata.generated_at
and dbt_objects.dbt_run_results[0].metadata.generated_at
):
self.context.get().run_results_generate_time = (
dbt_objects.dbt_run_results.metadata.generated_at
dbt_objects.dbt_run_results[0].metadata.generated_at
)
for key, manifest_node in manifest_entities.items():
try:

View File

@ -20,13 +20,13 @@ from pydantic import BaseModel
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict] = None
dbt_manifest: dict
dbt_run_results: Optional[dict] = None
dbt_run_results: Optional[List[dict]] = None
class DbtObjects(BaseModel):
dbt_catalog: Optional[Any] = None
dbt_manifest: Any
dbt_run_results: Optional[Any] = None
dbt_run_results: Optional[List[Any]] = None
class DbtFilteredModel(BaseModel):

View File

@ -508,7 +508,7 @@ class DbtUnitTest(TestCase):
if dbt_files.dbt_catalog
else None,
dbt_manifest=parse_manifest(dbt_files.dbt_manifest),
dbt_run_results=parse_run_results(dbt_files.dbt_run_results)
dbt_run_results=[parse_run_results(dbt_files.dbt_run_results)]
if dbt_files.dbt_run_results
else None,
)

View File

@ -13,7 +13,11 @@ This functionality is supported for s3, GCS, and Azure configurations only.
{% /note %}
To ensure the workflow operates smoothly, organize the dbt files for each project into separate directories and name the files `manifest.json`, `catalog.json`, and `run_results.json`. The workflow will scan through the specified prefix path in the designated bucket, traversing each folder to locate these dbt files.
To ensure the workflow operates smoothly, organize the dbt files for each project into separate directories and name the files `manifest.json`, `catalog.json`, and `run_results.json`.
If your dbt tests are split across multiple run_results.json files, place these files in the same directory as their corresponding manifest.json file. Ensure that each file retains `run_results` in its name, and append a unique suffix as needed. For example: run_results_one.json, run_results_two.json, run_results_three.json
The workflow will scan through the specified prefix path in the designated bucket, traversing each folder to locate these dbt files.
The dbt workflow will scan through the provided prefix path in the specified bucket and go through each folder to find the dbt files.
@ -38,7 +42,9 @@ bucket_home/
├── dbt_project_two/
│ ├── manifest.json
│ ├── catalog.json
│ └── run_results.json
│ └── run_results_one.json
| └── run_results_two.json
| └── run_results_three.json
└── dbt_project_three/
├── manifest.json
├── catalog.json