feat: add dbt freshness check test (#18730)

* add dbt freshness check

* docs

* run linting

* add test case param definition

* fix test case param definition

* add config for dbt http, fix linting

* refactor (only create freshness test definition when user executed one)

* fix dbt files class

* fix dbt files class 2

* fix dbt objects class

* fix linting

* fix pylint

* fix linting once and for all

---------

Co-authored-by: Teddy <teddy.crepineau@gmail.com>
This commit is contained in:
mgorsk1 2024-11-28 18:30:11 +01:00 committed by GitHub
parent 6410583018
commit da176767a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 143 additions and 3 deletions

View File

@ -82,6 +82,7 @@ NONE_KEYWORDS_LIST = ["none", "null"]
DBT_CATALOG_FILE_NAME = "catalog.json" DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json" DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results" DBT_RUN_RESULTS_FILE_NAME = "run_results"
DBT_SOURCES_FILE_NAME = "sources.json"
class SkipResourceTypeEnum(Enum): class SkipResourceTypeEnum(Enum):
@ -91,6 +92,7 @@ class SkipResourceTypeEnum(Enum):
ANALYSIS = "analysis" ANALYSIS = "analysis"
TEST = "test" TEST = "test"
SOURCE = "source"
class CompiledQueriesEnum(Enum): class CompiledQueriesEnum(Enum):
@ -127,6 +129,7 @@ class DbtTestFailureEnum(Enum):
FAILURE = "failure" FAILURE = "failure"
FAIL = "fail" FAIL = "fail"
ERROR = "error"
class DbtCommonEnum(Enum): class DbtCommonEnum(Enum):
@ -137,6 +140,7 @@ class DbtCommonEnum(Enum):
OWNER = "owner" OWNER = "owner"
NODES = "nodes" NODES = "nodes"
SOURCES = "sources" SOURCES = "sources"
SOURCES_FILE = "sources_file"
SOURCE = "source" SOURCE = "source"
RESOURCETYPE = "resource_type" RESOURCETYPE = "resource_type"
MANIFEST_NODE = "manifest_node" MANIFEST_NODE = "manifest_node"

View File

@ -43,6 +43,7 @@ from metadata.ingestion.source.database.dbt.constants import (
DBT_CATALOG_FILE_NAME, DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME, DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME, DBT_RUN_RESULTS_FILE_NAME,
DBT_SOURCES_FILE_NAME,
) )
from metadata.ingestion.source.database.dbt.models import DbtFiles from metadata.ingestion.source.database.dbt.models import DbtFiles
from metadata.readers.file.config_source_factory import get_reader from metadata.readers.file.config_source_factory import get_reader
@ -85,6 +86,7 @@ def _(config: DbtLocalConfig):
config.dbtManifestFilePath, config.dbtManifestFilePath,
config.dbtCatalogFilePath, config.dbtCatalogFilePath,
config.dbtRunResultsFilePath, config.dbtRunResultsFilePath,
config.dbtSourcesFilePath,
] ]
yield from download_dbt_files( yield from download_dbt_files(
blob_grouped_by_directory=blob_grouped_by_directory, blob_grouped_by_directory=blob_grouped_by_directory,
@ -123,12 +125,22 @@ def _(config: DbtHttpConfig):
dbt_catalog = requests.get( # pylint: disable=missing-timeout dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath config.dbtCatalogHttpPath
) )
dbt_sources = None
if config.dbtSourcesHttpPath:
logger.debug(
f"Requesting [dbtSourcesHttpPath] to: {config.dbtSourcesHttpPath}"
)
dbt_sources = requests.get( # pylint: disable=missing-timeout
config.dbtSourcesHttpPath
)
if not dbt_manifest: if not dbt_manifest:
raise DBTConfigException("Manifest file not found in file server") raise DBTConfigException("Manifest file not found in file server")
yield DbtFiles( yield DbtFiles(
dbt_catalog=dbt_catalog.json() if dbt_catalog else None, dbt_catalog=dbt_catalog.json() if dbt_catalog else None,
dbt_manifest=dbt_manifest.json(), dbt_manifest=dbt_manifest.json(),
dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None, dbt_run_results=[dbt_run_results.json()] if dbt_run_results else None,
dbt_sources=dbt_sources.json() if dbt_sources else None,
) )
except DBTConfigException as exc: except DBTConfigException as exc:
raise exc raise exc
@ -243,6 +255,7 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
return blob_grouped_by_directory return blob_grouped_by_directory
# pylint: disable=too-many-locals, too-many-branches
def download_dbt_files( def download_dbt_files(
blob_grouped_by_directory: Dict, config, client, bucket_name: Optional[str] blob_grouped_by_directory: Dict, config, client, bucket_name: Optional[str]
) -> Iterable[DbtFiles]: ) -> Iterable[DbtFiles]:
@ -255,6 +268,7 @@ def download_dbt_files(
) in blob_grouped_by_directory.items(): ) in blob_grouped_by_directory.items():
dbt_catalog = None dbt_catalog = None
dbt_manifest = None dbt_manifest = None
dbt_sources = None
dbt_run_results = [] dbt_run_results = []
kwargs = {} kwargs = {}
if bucket_name: if bucket_name:
@ -285,12 +299,16 @@ def download_dbt_files(
logger.warning( logger.warning(
f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}" f"{DBT_RUN_RESULTS_FILE_NAME} not found in {key}: {exc}"
) )
if DBT_SOURCES_FILE_NAME == blob_file_name.lower():
logger.debug(f"{DBT_SOURCES_FILE_NAME} found in {key}")
dbt_sources = reader.read(path=blob, **kwargs)
if not dbt_manifest: if not dbt_manifest:
raise DBTConfigException(f"Manifest file not found at: {key}") raise DBTConfigException(f"Manifest file not found at: {key}")
yield DbtFiles( yield DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None, dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest), dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=dbt_run_results if dbt_run_results else None, dbt_run_results=dbt_run_results if dbt_run_results else None,
dbt_sources=json.loads(dbt_sources) if dbt_sources else None,
) )
except DBTConfigException as exc: except DBTConfigException as exc:
logger.warning(exc) logger.warning(exc)

View File

@ -15,7 +15,12 @@ DBT service Topology.
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Iterable, List from typing import Iterable, List
from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results from dbt_artifacts_parser.parser import (
parse_catalog,
parse_manifest,
parse_run_results,
parse_sources,
)
from pydantic import Field from pydantic import Field
from typing_extensions import Annotated from typing_extensions import Annotated
@ -209,11 +214,13 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
self.remove_run_result_non_required_keys( self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results run_results=self.context.get().dbt_file.dbt_run_results
) )
dbt_objects = DbtObjects( dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog) dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog if self.context.get().dbt_file.dbt_catalog
else None, else None,
dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest), dbt_manifest=parse_manifest(self.context.get().dbt_file.dbt_manifest),
dbt_sources=parse_sources(self.context.get().dbt_file.dbt_sources),
dbt_run_results=[ dbt_run_results=[
parse_run_results(run_result_file) parse_run_results(run_result_file)
for run_result_file in self.context.get().dbt_file.dbt_run_results for run_result_file in self.context.get().dbt_file.dbt_run_results

View File

@ -44,6 +44,20 @@ def create_test_case_parameter_definitions(dbt_test):
} }
] ]
return test_case_param_definition return test_case_param_definition
if hasattr(dbt_test, "freshness"):
test_case_param_definition = [
{
"name": "warn_after",
"displayName": "warn_after",
"required": False,
},
{
"name": "error_after",
"displayName": "error_after",
"required": False,
},
]
return test_case_param_definition
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error( logger.error(
@ -67,6 +81,21 @@ def create_test_case_parameter_values(dbt_test):
{"name": manifest_node.test_metadata.name, "value": dbt_test_values} {"name": manifest_node.test_metadata.name, "value": dbt_test_values}
] ]
return test_case_param_values return test_case_param_values
if hasattr(manifest_node, "freshness"):
warn_after = manifest_node.freshness.warn_after
error_after = manifest_node.freshness.error_after
test_case_param_values = [
{
"name": "error_after",
"value": f"{error_after.count} {error_after.period.value}",
},
{
"name": "warn_after",
"value": f"{warn_after.count} {warn_after.period.value}",
},
]
return test_case_param_values
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error( logger.error(

View File

@ -13,6 +13,7 @@
DBT source methods. DBT source methods.
""" """
import traceback import traceback
from copy import deepcopy
from datetime import datetime from datetime import datetime
from typing import Any, Iterable, List, Optional, Union from typing import Any, Iterable, List, Optional, Union
@ -324,7 +325,41 @@ class DbtSource(DbtServiceSource):
None, None,
) )
# pylint: disable=too-many-locals, too-many-branches def _add_dbt_freshness_test_from_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
):
# in dbt manifest sources node name is table/view name (not test name like with test nodes)
# so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
manifest_node_new = deepcopy(manifest_node)
manifest_node_new.name = manifest_node_new.name + "_freshness"
freshness_test_result = next(
(item for item in dbt_objects.dbt_sources.results if item.unique_id == key),
None,
)
if freshness_test_result:
self.context.get().dbt_tests[key + "_freshness"] = {
DbtCommonEnum.MANIFEST_NODE.value: manifest_node_new
}
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.UPSTREAM.value
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
self.context.get().dbt_tests[key + "_freshness"][
DbtCommonEnum.RESULTS.value
] = freshness_test_result
def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
"""
self._add_dbt_freshness_test_from_sources(
key, manifest_node, manifest_entities, dbt_objects
)
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def yield_data_models( def yield_data_models(
self, dbt_objects: DbtObjects self, dbt_objects: DbtObjects
) -> Iterable[Either[DataModelLink]]: ) -> Iterable[Either[DataModelLink]]:
@ -376,6 +411,17 @@ class DbtSource(DbtServiceSource):
) )
continue continue
if (
dbt_objects.dbt_sources
and resource_type == SkipResourceTypeEnum.SOURCE.value
):
self.add_dbt_sources(
key,
manifest_node=manifest_node,
manifest_entities=manifest_entities,
dbt_objects=dbt_objects,
)
# Skip the ephemeral nodes since it is not materialized # Skip the ephemeral nodes since it is not materialized
if check_ephemeral_node(manifest_node): if check_ephemeral_node(manifest_node):
logger.debug(f"Skipping ephemeral DBT node: {key}.") logger.debug(f"Skipping ephemeral DBT node: {key}.")
@ -549,6 +595,29 @@ class DbtSource(DbtServiceSource):
f"Failed to parse the DBT node {node} to get upstream nodes: {exc}" f"Failed to parse the DBT node {node} to get upstream nodes: {exc}"
) )
continue continue
if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value:
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
database_name=get_corrected_name(dbt_node.database),
schema_name=get_corrected_name(dbt_node.schema_),
table_name=dbt_node.name,
)
# check if the parent table exists in OM before adding it to the upstream list
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=parent_fqn
),
fetch_multiple_entities=False,
)
if parent_table_entity:
upstream_nodes.append(parent_fqn)
return upstream_nodes return upstream_nodes
def parse_data_model_columns( def parse_data_model_columns(

View File

@ -20,12 +20,14 @@ from pydantic import BaseModel
class DbtFiles(BaseModel): class DbtFiles(BaseModel):
dbt_catalog: Optional[dict] = None dbt_catalog: Optional[dict] = None
dbt_manifest: dict dbt_manifest: dict
dbt_sources: Optional[dict] = None
dbt_run_results: Optional[List[dict]] = None dbt_run_results: Optional[List[dict]] = None
class DbtObjects(BaseModel): class DbtObjects(BaseModel):
dbt_catalog: Optional[Any] = None dbt_catalog: Optional[Any] = None
dbt_manifest: Any dbt_manifest: Any
dbt_sources: Optional[Any] = None
dbt_run_results: Optional[List[Any]] = None dbt_run_results: Optional[List[Any]] = None

View File

@ -51,6 +51,7 @@ mock_dbt_config = {
"dbtCatalogFilePath": "sample/dbt_files/catalog.json", "dbtCatalogFilePath": "sample/dbt_files/catalog.json",
"dbtManifestFilePath": "sample/dbt_files/manifest.json", "dbtManifestFilePath": "sample/dbt_files/manifest.json",
"dbtRunResultsFilePath": "sample/dbt_files/run_results.json", "dbtRunResultsFilePath": "sample/dbt_files/run_results.json",
"dbtSourcesFilePath": "sample/dbt_files/sources.json",
}, },
} }
}, },
@ -682,7 +683,7 @@ class DbtUnitTest(TestCase):
self.assertEqual(expected, original) self.assertEqual(expected, original)
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_updtream_nodes_for_lineage(self, es_search_from_fqn): def test_upstream_nodes_for_lineage(self, es_search_from_fqn):
expected_upstream_nodes = [ expected_upstream_nodes = [
"model.jaffle_shop.stg_customers", "model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders", "model.jaffle_shop.stg_orders",

View File

@ -26,6 +26,11 @@
"title": "DBT Run Results HTTP File Path", "title": "DBT Run Results HTTP File Path",
"description": "DBT run results http file path to extract the test results information.", "description": "DBT run results http file path to extract the test results information.",
"type": "string" "type": "string"
},
"dbtSourcesHttpPath": {
"title": "DBT Sources HTTP File Path",
"description": "DBT sources http file path to extract freshness test results information.",
"type": "string"
} }
}, },
"additionalProperties": false, "additionalProperties": false,

View File

@ -26,6 +26,11 @@
"title": "DBT Run Results File Path", "title": "DBT Run Results File Path",
"description": "DBT run results file path to extract the test results information.", "description": "DBT run results file path to extract the test results information.",
"type": "string" "type": "string"
},
"dbtSourcesFilePath": {
"title": "DBT Sources File Path",
"description": "DBT sources file path to extract the freshness test result.",
"type": "string"
} }
}, },
"additionalProperties": false, "additionalProperties": false,