From da176767a88eb8edc2fe94e26a1c3dd275ab764a Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Thu, 28 Nov 2024 18:30:11 +0100 Subject: [PATCH] feat: add dbt freshness check test (#18730) * add dbt freshness check * docs * run linting * add test case param definition * fix test case param definition * add config for dbt http, fix linting * refactor (only create freshness test definition when user executed one) * fix dbt files class * fix dbt files class 2 * fix dbt objects class * fix linting * fix pylint * fix linting once and for all --------- Co-authored-by: Teddy --- .../source/database/dbt/constants.py | 4 ++ .../source/database/dbt/dbt_config.py | 18 +++++ .../source/database/dbt/dbt_service.py | 9 ++- .../source/database/dbt/dbt_utils.py | 29 ++++++++ .../ingestion/source/database/dbt/metadata.py | 71 ++++++++++++++++++- .../ingestion/source/database/dbt/models.py | 2 + ingestion/tests/unit/test_dbt.py | 3 +- .../dbtconfig/dbtHttpConfig.json | 5 ++ .../dbtconfig/dbtLocalConfig.json | 5 ++ 9 files changed, 143 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py index 83c49c0724a..834e248d2fa 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -82,6 +82,7 @@ NONE_KEYWORDS_LIST = ["none", "null"] DBT_CATALOG_FILE_NAME = "catalog.json" DBT_MANIFEST_FILE_NAME = "manifest.json" DBT_RUN_RESULTS_FILE_NAME = "run_results" +DBT_SOURCES_FILE_NAME = "sources.json" class SkipResourceTypeEnum(Enum): @@ -91,6 +92,7 @@ class SkipResourceTypeEnum(Enum): ANALYSIS = "analysis" TEST = "test" + SOURCE = "source" class CompiledQueriesEnum(Enum): @@ -127,6 +129,7 @@ class DbtTestFailureEnum(Enum): FAILURE = "failure" FAIL = "fail" + ERROR = "error" class DbtCommonEnum(Enum): @@ -137,6 +140,7 @@ class DbtCommonEnum(Enum): OWNER = "owner" NODES = "nodes" SOURCES = "sources" + SOURCES_FILE = "sources_file" SOURCE = "source" RESOURCETYPE = "resource_type" MANIFEST_NODE = "manifest_node" diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py index 216d7c6e9f8..66e25332e1e 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py @@ -43,6 +43,7 @@ from metadata.ingestion.source.database.dbt.constants import ( DBT_CATALOG_FILE_NAME, DBT_MANIFEST_FILE_NAME, DBT_RUN_RESULTS_FILE_NAME, + DBT_SOURCES_FILE_NAME, ) from metadata.ingestion.source.database.dbt.models import DbtFiles from metadata.readers.file.config_source_factory import get_reader @@ -85,6 +86,7 @@ def _(config: DbtLocalConfig): config.dbtManifestFilePath, config.dbtCatalogFilePath, config.dbtRunResultsFilePath, + config.dbtSourcesFilePath, ] yield from download_dbt_files( blob_grouped_by_directory=blob_grouped_by_directory, @@ -123,12 +125,22 @@ def _(config: DbtHttpConfig): dbt_catalog = requests.get( # pylint: disable=missing-timeout config.dbtCatalogHttpPath ) + + dbt_sources = None + if config.dbtSourcesHttpPath: + logger.debug( + f"Requesting [dbtSourcesHttpPath] to: {config.dbtSourcesHttpPath}" + ) + dbt_sources = requests.get( # pylint: disable=missing-timeout + config.dbtSourcesHttpPath + ) if not dbt_manifest: raise DBTConfigException("Manifest file not found in file server") yield DbtFiles( dbt_catalog=dbt_catalog.json() if dbt_catalog else None, dbt_manifest=dbt_manifest.json(), dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None, + dbt_sources=dbt_sources.json() if dbt_sources else None, ) except DBTConfigException as exc: raise exc @@ -243,6 +255,7 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]: return blob_grouped_by_directory +# pylint: disable=too-many-locals, too-many-branches def download_dbt_files( blob_grouped_by_directory: Dict, config, client, bucket_name: Optional[str] ) -> Iterable[DbtFiles]: @@ -255,6 +268,7 @@ def download_dbt_files( ) in blob_grouped_by_directory.items(): dbt_catalog = None dbt_manifest = None + dbt_sources = None dbt_run_results = [] kwargs = {} if bucket_name: @@ -285,12 +299,16 @@ def download_dbt_files( logger.warning( f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}" ) + if DBT_SOURCES_FILE_NAME == blob_file_name.lower(): + logger.debug(f"{DBT_SOURCES_FILE_NAME} found in {key}") + dbt_sources = reader.read(path=blob, **kwargs) if not dbt_manifest: raise DBTConfigException(f"Manifest file not found at: {key}") yield DbtFiles( dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None, dbt_manifest=json.loads(dbt_manifest), dbt_run_results=dbt_run_results if dbt_run_results else None, + dbt_sources=json.loads(dbt_sources) if dbt_sources else None, ) except DBTConfigException as exc: logger.warning(exc) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index aa2d65f4e2c..50a160164a0 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -15,7 +15,12 @@ DBT service Topology. from abc import ABC, abstractmethod from typing import Iterable, List -from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results +from dbt_artifacts_parser.parser import ( + parse_catalog, + parse_manifest, + parse_run_results, + parse_sources, +) from pydantic import Field from typing_extensions import Annotated @@ -209,11 +214,13 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): self.remove_run_result_non_required_keys( run_results=self.context.get().dbt_file.dbt_run_results ) + dbt_objects = DbtObjects( dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog) if self.context.get().dbt_file.dbt_catalog else None, dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest), + dbt_sources=parse_sources(self.context.get().dbt_file.dbt_sources), dbt_run_results=[ parse_run_results(run_result_file) for run_result_file in self.context.get().dbt_file.dbt_run_results diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py index 70bfcabe1b1..1897f4a7f5d 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py @@ -44,6 +44,20 @@ def create_test_case_parameter_definitions(dbt_test): } ] return test_case_param_definition + if hasattr(dbt_test, "freshness"): + test_case_param_definition = [ + { + "name": "warn_after", + "displayName": "warn_after", + "required": False, + }, + { + "name": "error_after", + "displayName": "error_after", + "required": False, + }, + ] + return test_case_param_definition except Exception as err: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.error( @@ -67,6 +81,21 @@ def create_test_case_parameter_values(dbt_test): {"name": manifest_node.test_metadata.name, "value": dbt_test_values} ] return test_case_param_values + if hasattr(manifest_node, "freshness"): + warn_after = manifest_node.freshness.warn_after + error_after = manifest_node.freshness.error_after + + test_case_param_values = [ + { + "name": "error_after", + "value": f"{error_after.count} {error_after.period.value}", + }, + { + "name": "warn_after", + "value": f"{warn_after.count} {warn_after.period.value}", + }, + ] + return test_case_param_values except Exception as err: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.error( diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 82bdf2e3085..5c1b2cf81e3 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -13,6 +13,7 @@ DBT source methods. """ import traceback +from copy import deepcopy from datetime import datetime from typing import Any, Iterable, List, Optional, Union @@ -324,7 +325,41 @@ class DbtSource(DbtServiceSource): None, ) - # pylint: disable=too-many-locals, too-many-branches + def _add_dbt_freshness_test_from_sources( + self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects + ): + # in dbt manifest sources node name is table/view name (not test name like with test nodes) + # so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + manifest_node_new = deepcopy(manifest_node) + manifest_node_new.name = manifest_node_new.name + "_freshness" + + freshness_test_result = next( + (item for item in dbt_objects.dbt_sources.results if item.unique_id == key), + None, + ) + + if freshness_test_result: + self.context.get().dbt_tests[key + "_freshness"] = { + DbtCommonEnum.MANIFEST_NODE.value: manifest_node_new + } + self.context.get().dbt_tests[key + "_freshness"][ + DbtCommonEnum.UPSTREAM.value + ] = self.parse_upstream_nodes(manifest_entities, manifest_node) + self.context.get().dbt_tests[key + "_freshness"][ + DbtCommonEnum.RESULTS.value + ] = freshness_test_result + + def add_dbt_sources( + self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects + ) -> None: + """ + Method to append dbt test cases based on sources file for later processing + """ + self._add_dbt_freshness_test_from_sources( + key, manifest_node, manifest_entities, dbt_objects + ) + + # pylint: disable=too-many-locals, too-many-branches, too-many-statements def yield_data_models( self, dbt_objects: DbtObjects ) -> Iterable[Either[DataModelLink]]: @@ -376,6 +411,17 @@ class DbtSource(DbtServiceSource): ) continue + if ( + dbt_objects.dbt_sources + and resource_type == SkipResourceTypeEnum.SOURCE.value + ): + self.add_dbt_sources( + key, + manifest_node=manifest_node, + manifest_entities=manifest_entities, + dbt_objects=dbt_objects, + ) + # Skip the ephemeral nodes since it is not materialized if check_ephemeral_node(manifest_node): logger.debug(f"Skipping ephemeral DBT node: {key}.") @@ -549,6 +595,29 @@ class DbtSource(DbtServiceSource): f"Failed to parse the DBT node {node} to get upstream nodes: {exc}" ) continue + + if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value: + parent_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name="*", + database_name=get_corrected_name(dbt_node.database), + schema_name=get_corrected_name(dbt_node.schema_), + table_name=dbt_node.name, + ) + + # check if the parent table exists in OM before adding it to the upstream list + parent_table_entity: Optional[ + Union[Table, List[Table]] + ] = get_entity_from_es_result( + entity_list=self.metadata.es_search_from_fqn( + entity_type=Table, fqn_search_string=parent_fqn + ), + fetch_multiple_entities=False, + ) + if parent_table_entity: + upstream_nodes.append(parent_fqn) + return upstream_nodes def parse_data_model_columns( diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/models.py b/ingestion/src/metadata/ingestion/source/database/dbt/models.py index 88671141d43..e505368994a 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/models.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/models.py @@ -20,12 +20,14 @@ from pydantic import BaseModel class DbtFiles(BaseModel): dbt_catalog: Optional[dict] = None dbt_manifest: dict + dbt_sources: Optional[dict] = None dbt_run_results: Optional[List[dict]] = None class DbtObjects(BaseModel): dbt_catalog: Optional[Any] = None dbt_manifest: Any + dbt_sources: Optional[Any] = None dbt_run_results: Optional[List[Any]] = None diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index b04d69c5fc5..1c2db6edc0a 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -51,6 +51,7 @@ mock_dbt_config = { "dbtCatalogFilePath": "sample/dbt_files/catalog.json", "dbtManifestFilePath": "sample/dbt_files/manifest.json", "dbtRunResultsFilePath": "sample/dbt_files/run_results.json", + "dbtSourcesFilePath": "sample/dbt_files/sources.json", }, } }, @@ -682,7 +683,7 @@ class DbtUnitTest(TestCase): self.assertEqual(expected, original) @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") - def test_updtream_nodes_for_lineage(self, es_search_from_fqn): + def test_upstream_nodes_for_lineage(self, es_search_from_fqn): expected_upstream_nodes = [ "model.jaffle_shop.stg_customers", "model.jaffle_shop.stg_orders", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtHttpConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtHttpConfig.json index 179573b67ec..7da25a51536 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtHttpConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtHttpConfig.json @@ -26,6 +26,11 @@ "title": "DBT Run Results HTTP File Path", "description": "DBT run results http file path to extract the test results information.", "type": "string" + }, + "dbtSourcesHttpPath": { + "title": "DBT Sources HTTP File Path", + "description": "DBT sources http file path to extract freshness test results information.", + "type": "string" } }, "additionalProperties": false, diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtLocalConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtLocalConfig.json index 171b2a675f8..94ffc2cf177 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtLocalConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtconfig/dbtLocalConfig.json @@ -26,6 +26,11 @@ "title": "DBT Run Results File Path", "description": "DBT run results file path to extract the test results information.", "type": "string" + }, + "dbtSourcesFilePath": { + "title": "DBT Sources File Path", + "description": "DBT sources file path to extract the freshness test result.", + "type": "string" } }, "additionalProperties": false,