feat(ingest): dbt - control over emitting test_results, test_definitions, etc. (#5328)

Co-authored-by: Piotr Sierkin <piotr.sierkin@getindata.com>
Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Piotr Sierkin 2022-08-07 06:42:53 +02:00 committed by GitHub
parent 69124a0a99
commit 828a711684
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 461 additions and 22 deletions

View File

@ -179,5 +179,34 @@ The connector will produce the following things:
#### Viewing timeline for a failed dbt test
![test view](https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/dbt-tests-failure-view.png)
#### Separating test result emission from other metadata emission
You can segregate emission of test results from the emission of other dbt metadata using the `entities_enabled` config flag.
The following recipe shows you how to emit only test results.
```yaml
source:
type: dbt
config:
manifest_path: _path_to_manifest_json
catalog_path: _path_to_catalog_json
test_results_path: _path_to_run_results_json
target_platform: postgres
entities_enabled:
test_results: Only
```
Similarly, the following recipe shows you how to emit everything (i.e. models, sources, seeds, test definitions) but not test results:
```yaml
source:
type: dbt
config:
manifest_path: _path_to_manifest_json
catalog_path: _path_to_catalog_json
run_results_path: _path_to_run_results_json
target_platform: postgres
entities_enabled:
test_results: No
```

View File

@ -114,8 +114,7 @@ def get_enum_description(
description = (
description + "."
if description
else "" + " Allowed symbols are " + ",".join(enum_symbols)
)
else "") + " Allowed symbols are " + ", ".join(enum_symbols)
return description
@ -136,9 +135,6 @@ def gen_md_table(
default=str(field_dict.get("default", "None")),
)
)
# md_str.append(
# f"| {get_prefixed_name(field_prefix, None)} | Enum | {field_dict['type']} | one of {','.join(field_dict['enum'])} |\n"
# )
elif "properties" in field_dict:
for field_name, value in field_dict["properties"].items():
@ -207,7 +203,6 @@ def gen_md_table(
"additionalProperties" in value
and "$ref" in value["additionalProperties"]
):
# breakpoint()
value_ref = value["additionalProperties"]["$ref"]
def_dict = get_definition_dict_from_definition(
definitions_dict, value_ref
@ -462,7 +457,6 @@ def generate(
if extra_docs:
for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True):
# breakpoint()
m = re.search("/docs/sources/(.*)/(.*).md", path)
if m:
@ -555,7 +549,6 @@ def generate(
source_documentation[platform_id] = (
source_documentation.get(platform_id) or {}
)
# breakpoint()
create_or_update(
source_documentation,

View File

@ -3,6 +3,8 @@ import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from functools import cached_property
from typing import (
Any,
Callable,
@ -19,7 +21,7 @@ from urllib.parse import urlparse
import dateutil.parser
import requests
from pydantic import BaseModel, validator
from pydantic import BaseModel, root_validator, validator
from pydantic.fields import Field
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
@ -134,6 +136,89 @@ class DBTSourceReport(StatefulIngestionReport):
self.soft_deleted_stale_entities.append(urn)
class EmitDirective(Enum):
"""A holder for directives for emission for specific types of entities"""
YES = "YES" # Okay to emit for this type
NO = "NO" # Do not emit for this type
ONLY = "ONLY" # Only emit metadata for this type and no others
class DBTEntitiesEnabled(BaseModel):
"""Controls which dbt entities are going to be emitted by this source"""
class Config:
arbitrary_types_allowed = True # needed to allow cached_property to work
keep_untouched = (
cached_property,
) # needed to allow cached_property to work. See https://github.com/samuelcolvin/pydantic/issues/1241 for more info.
models: EmitDirective = Field(
"Yes", description="Emit metadata for dbt models when set to Yes or Only"
)
sources: EmitDirective = Field(
"Yes", description="Emit metadata for dbt sources when set to Yes or Only"
)
seeds: EmitDirective = Field(
"Yes", description="Emit metadata for dbt seeds when set to Yes or Only"
)
test_definitions: EmitDirective = Field(
"Yes",
description="Emit metadata for test definitions when enabled when set to Yes or Only",
)
test_results: EmitDirective = Field(
"Yes", description="Emit metadata for test results when set to Yes or Only"
)
@validator("*", pre=True, always=True)
def to_upper(cls, v):
return v.upper() if isinstance(v, str) else v
@root_validator
def only_one_can_be_set_to_only(cls, values):
only_values = [k for k in values if values.get(k) == EmitDirective.ONLY]
if len(only_values) > 1:
raise ValueError(
f"Cannot have more than 1 type of entity emission set to ONLY. Found {only_values}"
)
return values
def _any_other_only_set(self, attribute: str) -> bool:
"""Return true if any attribute other than the one passed in is set to ONLY"""
other_onlies = [
k
for k, v in self.__dict__.items()
if k != attribute and v == EmitDirective.ONLY
]
return len(other_onlies) != 0
@cached_property
def node_type_emit_decision_cache(self) -> Dict[str, bool]:
node_type_for_field_map = {
"models": "model",
"sources": "source",
"seeds": "seed",
"test_definitions": "test",
}
return {
node_type_for_field_map[k]: False
if self._any_other_only_set(k)
or self.__getattribute__(k) == EmitDirective.NO
else True
for k in ["models", "sources", "seeds", "test_definitions"]
}
def can_emit_node_type(self, node_type: str) -> bool:
return self.node_type_emit_decision_cache.get(node_type, False)
@property
def can_emit_test_results(self) -> bool:
return (
not self._any_other_only_set("test_results")
and self.test_results != EmitDirective.NO
)
class DBTConfig(StatefulIngestionConfigBase):
manifest_path: str = Field(
description="Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI."
@ -170,7 +255,11 @@ class DBTConfig(StatefulIngestionConfigBase):
)
node_type_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for dbt nodes to filter in ingestion.",
description="Deprecated: use entities_enabled instead. Regex patterns for dbt nodes to filter in ingestion.",
)
entities_enabled: DBTEntitiesEnabled = Field(
DBTEntitiesEnabled(),
description="Controls for enabling / disabling metadata emission for different dbt entities (models, test definitions, test results, etc.)",
)
tag_prefix: str = Field(
default=f"{DBT_PLATFORM}:", description="Prefix added to tags during ingestion."
@ -386,6 +475,7 @@ def extract_dbt_entities(
node_type_pattern: AllowDenyPattern,
report: DBTSourceReport,
node_name_pattern: AllowDenyPattern,
entities_enabled: DBTEntitiesEnabled,
) -> List[DBTNode]:
sources_by_id = {x["unique_id"]: x for x in sources_results}
@ -393,6 +483,9 @@ def extract_dbt_entities(
for key, manifest_node in all_manifest_entities.items():
# check if node pattern allowed based on config file
if not node_type_pattern.allowed(manifest_node["resource_type"]):
logger.debug(
f"Not extracting dbt entity {key} since node type {manifest_node['resource_type']} is disabled"
)
continue
name = manifest_node["name"]
@ -868,6 +961,10 @@ class DBTTest:
test_nodes: List[DBTNode],
manifest_nodes: Dict[str, Any],
) -> Iterable[MetadataWorkUnit]:
if not config.entities_enabled.can_emit_test_results:
logger.debug("Skipping test result emission since it is turned off.")
return []
args = test_results_json.get("args", {})
dbt_metadata = DBTRunMetadata.parse_obj(test_results_json.get("metadata", {}))
test_nodes_map: Dict[str, DBTNode] = {x.dbt_name: x for x in test_nodes}
@ -879,7 +976,7 @@ class DBTTest:
test_result = DBTTestResult.parse_obj(result)
id = test_result.unique_id
test_node = test_nodes_map.get(id)
assert test_node
assert test_node, f"Failed to find test_node {id} in the catalog"
upstream_urns = get_upstreams(
test_node.upstream_nodes,
manifest_nodes,
@ -1167,6 +1264,7 @@ class DBTSource(StatefulIngestionSourceBase):
node_type_pattern,
report,
node_name_pattern,
self.config.entities_enabled,
)
return (
@ -1188,6 +1286,9 @@ class DBTSource(StatefulIngestionSourceBase):
def string_map(input_map: Dict[str, Any]) -> Dict[str, str]:
return {k: str(v) for k, v in input_map.items()}
if not self.config.entities_enabled.can_emit_node_type("test"):
return []
for node in test_nodes:
node_datahub_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
@ -1359,14 +1460,6 @@ class DBTSource(StatefulIngestionSourceBase):
self.report.report_workunit(soft_delete_wu)
yield soft_delete_wu
if self.config.test_results_path:
yield from DBTTest.load_test_results(
self.config,
self.load_file_as_json(self.config.test_results_path),
test_nodes,
manifest_nodes,
)
# create workunits from dbt nodes
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.config.write_semantics == "PATCH" and not self.ctx.graph:
@ -1437,6 +1530,14 @@ class DBTSource(StatefulIngestionSourceBase):
manifest_nodes_raw,
)
if self.config.test_results_path:
yield from DBTTest.load_test_results(
self.config,
self.load_file_as_json(self.config.test_results_path),
test_nodes,
manifest_nodes_raw,
)
if self.is_stateful_ingestion_configured():
# Clean up stale entities.
yield from self.gen_removed_entity_workunits()
@ -1495,6 +1596,7 @@ class DBTSource(StatefulIngestionSourceBase):
self.config.strip_user_ids_from_email,
)
for node in dbt_nodes:
node_datahub_urn = get_urn_from_dbtNode(
node.database,
node.schema,
@ -1503,6 +1605,11 @@ class DBTSource(StatefulIngestionSourceBase):
self.config.env,
mce_platform_instance,
)
if not self.config.entities_enabled.can_emit_node_type(node.node_type):
logger.debug(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)
continue
self.save_checkpoint(node_datahub_urn, "dataset")
meta_aspects: Dict[str, Any] = {}

View File

@ -9,7 +9,12 @@ from freezegun import freeze_time
from datahub.configuration.common import DynamicTypedConfig
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.run.pipeline import Pipeline, PipelineConfig, SourceConfig
from datahub.ingestion.source.dbt import DBTConfig, DBTSource
from datahub.ingestion.source.dbt import (
DBTConfig,
DBTEntitiesEnabled,
DBTSource,
EmitDirective,
)
from datahub.ingestion.source.sql.sql_types import (
TRINO_SQL_TYPES_MAP,
resolve_trino_modified_type,
@ -670,3 +675,158 @@ def test_resolve_trino_modified_type(data_type, expected_data_type):
resolve_trino_modified_type(data_type)
== TRINO_SQL_TYPES_MAP[expected_data_type]
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_tests_only_assertions(pytestconfig, tmp_path, mock_time, **kwargs):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "test_only_assertions.json"
pipeline = Pipeline(
config=PipelineConfig(
source=SourceConfig(
type="dbt",
config=DBTConfig(
manifest_path=str(
(test_resources_dir / "jaffle_shop_manifest.json").resolve()
),
catalog_path=str(
(test_resources_dir / "jaffle_shop_catalog.json").resolve()
),
target_platform="postgres",
delete_tests_as_datasets=True,
test_results_path=str(
(test_resources_dir / "jaffle_shop_test_results.json").resolve()
),
# this is just here to avoid needing to access datahub server
write_semantics="OVERRIDE",
entities_enabled=DBTEntitiesEnabled(
test_results=EmitDirective.ONLY
),
),
),
sink=DynamicTypedConfig(type="file", config={"filename": str(output_file)}),
)
)
pipeline.run()
pipeline.raise_from_status()
# Verify the output.
# No datasets were emitted, and more than 20 events were emitted
assert (
mce_helpers.assert_entity_urn_not_like(
entity_type="dataset",
regex_pattern="urn:li:dataset:\\(urn:li:dataPlatform:dbt",
file=output_file,
)
> 20
)
number_of_valid_assertions_in_test_results = 23
assert (
mce_helpers.assert_entity_urn_like(
entity_type="assertion", regex_pattern="urn:li:assertion:", file=output_file
)
== number_of_valid_assertions_in_test_results
)
# no assertionInfo should be emitted
try:
mce_helpers.assert_for_each_entity(
entity_type="assertion",
aspect_name="assertionInfo",
aspect_field_matcher={},
file=output_file,
)
except AssertionError:
pass
# all assertions must have an assertionRunEvent emitted (except for one assertion)
assert (
mce_helpers.assert_for_each_entity(
entity_type="assertion",
aspect_name="assertionRunEvent",
aspect_field_matcher={},
file=output_file,
exception_urns=["urn:li:assertion:2ff754df689ea951ed2e12cbe356708f"],
)
== number_of_valid_assertions_in_test_results
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_only_test_definitions_and_results(
pytestconfig, tmp_path, mock_time, **kwargs
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "test_only_definitions_and_assertions.json"
pipeline = Pipeline(
config=PipelineConfig(
source=SourceConfig(
type="dbt",
config=DBTConfig(
manifest_path=str(
(test_resources_dir / "jaffle_shop_manifest.json").resolve()
),
catalog_path=str(
(test_resources_dir / "jaffle_shop_catalog.json").resolve()
),
target_platform="postgres",
test_results_path=str(
(test_resources_dir / "jaffle_shop_test_results.json").resolve()
),
# this is just here to avoid needing to access datahub server
write_semantics="OVERRIDE",
entities_enabled=DBTEntitiesEnabled(
sources=EmitDirective.NO,
seeds=EmitDirective.NO,
models=EmitDirective.NO,
),
),
),
sink=DynamicTypedConfig(type="file", config={"filename": str(output_file)}),
)
)
pipeline.run()
pipeline.raise_from_status()
# Verify the output. No datasets were emitted
assert (
mce_helpers.assert_entity_urn_not_like(
entity_type="dataset",
regex_pattern="urn:li:dataset:\\(urn:li:dataPlatform:dbt",
file=output_file,
)
> 20
)
number_of_assertions = 24
assert (
mce_helpers.assert_entity_urn_like(
entity_type="assertion", regex_pattern="urn:li:assertion:", file=output_file
)
== number_of_assertions
)
# all assertions must have an assertionInfo emitted
assert (
mce_helpers.assert_for_each_entity(
entity_type="assertion",
aspect_name="assertionInfo",
aspect_field_matcher={},
file=output_file,
)
== number_of_assertions
)
# all assertions must have an assertionRunEvent emitted (except for one assertion)
assert (
mce_helpers.assert_for_each_entity(
entity_type="assertion",
aspect_name="assertionRunEvent",
aspect_field_matcher={},
file=output_file,
exception_urns=["urn:li:assertion:2ff754df689ea951ed2e12cbe356708f"],
)
== number_of_assertions - 1
)

View File

@ -2,6 +2,7 @@ import json
import logging
import os
import pprint
import re
import shutil
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
@ -273,6 +274,7 @@ def _get_mcp_urn_path_spec() -> List[str]:
def assert_mce_entity_urn(
filter: str, entity_type: str, regex_pattern: str, file: str
) -> int:
"""Assert that all mce entity urns must match the regex pattern passed in. Return the number of events matched"""
test_output = load_json_file(file)
if isinstance(test_output, list):
@ -296,7 +298,11 @@ def assert_mce_entity_urn(
def assert_for_each_entity(
entity_type: str, aspect_name: str, aspect_field_matcher: Dict[str, Any], file: str
entity_type: str,
aspect_name: str,
aspect_field_matcher: Dict[str, Any],
file: str,
exception_urns: List[str] = [],
) -> int:
"""Assert that an aspect name with the desired fields exists for each entity urn"""
test_output = load_json_file(file)
@ -341,7 +347,7 @@ def assert_for_each_entity(
aspect_val, [f]
), f"urn: {urn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}"
success.append(urn)
else:
elif urn not in exception_urns:
print(f"Adding {urn} to failures")
failures.append(urn)
@ -401,3 +407,62 @@ def assert_entity_mcp_aspect(
), f"urn: {mcp.entityUrn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}"
matches = matches + 1
return matches
def assert_entity_urn_not_like(entity_type: str, regex_pattern: str, file: str) -> int:
"""Assert that there are no entity urns that match the regex pattern passed in. Returns the total number of events in the file"""
test_output = load_json_file(file)
assert isinstance(test_output, list)
# mce urns
mce_urns = set(
[
_get_element(x, _get_mce_urn_path_spec(entity_type))
for x in test_output
if _get_filter(mce=True, entity_type=entity_type)(x)
]
)
mcp_urns = set(
[
_get_element(x, _get_mcp_urn_path_spec())
for x in test_output
if _get_filter(mcp=True, entity_type=entity_type)(x)
]
)
all_urns = mce_urns.union(mcp_urns)
print(all_urns)
matched_urns = [u for u in all_urns if re.match(regex_pattern, u)]
if matched_urns:
raise AssertionError(f"urns found that match the deny list {matched_urns}")
return len(test_output)
def assert_entity_urn_like(entity_type: str, regex_pattern: str, file: str) -> int:
"""Assert that there exist entity urns that match the regex pattern passed in. Returns the total number of events in the file"""
test_output = load_json_file(file)
assert isinstance(test_output, list)
# mce urns
mce_urns = set(
[
_get_element(x, _get_mce_urn_path_spec(entity_type))
for x in test_output
if _get_filter(mce=True, entity_type=entity_type)(x)
]
)
mcp_urns = set(
[
_get_element(x, _get_mcp_urn_path_spec())
for x in test_output
if _get_filter(mcp=True, entity_type=entity_type)(x)
]
)
all_urns = mce_urns.union(mcp_urns)
print(all_urns)
matched_urns = [u for u in all_urns if re.match(regex_pattern, u)]
if matched_urns:
return len(matched_urns)
else:
raise AssertionError(
f"No urns found that match the pattern {regex_pattern}. Full list is {all_urns}"
)

View File

@ -1,6 +1,8 @@
from typing import Dict, List, Union
from unittest import mock
from pydantic import ValidationError
from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.dbt import DBTConfig, DBTSource
@ -168,3 +170,86 @@ def test_dbt_source_patching_terms():
assert len(transformed_terms) == 3
for transformed_term in transformed_terms:
assert transformed_term.urn in expected_terms
def test_dbt_entity_emission_configuration():
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {"models": "Only", "seeds": "Only"},
}
try:
DBTConfig.parse_obj(config_dict)
except ValidationError as ve:
assert len(ve.errors()) == 1
assert (
"Cannot have more than 1 type of entity emission set to ONLY"
in ve.errors()[0]["msg"]
)
# valid config
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {"models": "Yes", "seeds": "Only"},
}
DBTConfig.parse_obj(config_dict)
def test_dbt_entity_emission_configuration_helpers():
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {
"models": "Only",
},
}
config = DBTConfig.parse_obj(config_dict)
assert config.entities_enabled.can_emit_node_type("model")
assert not config.entities_enabled.can_emit_node_type("source")
assert not config.entities_enabled.can_emit_node_type("test")
assert not config.entities_enabled.can_emit_test_results
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
}
config = DBTConfig.parse_obj(config_dict)
assert config.entities_enabled.can_emit_node_type("model")
assert config.entities_enabled.can_emit_node_type("source")
assert config.entities_enabled.can_emit_node_type("test")
assert config.entities_enabled.can_emit_test_results
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {
"test_results": "Only",
},
}
config = DBTConfig.parse_obj(config_dict)
assert not config.entities_enabled.can_emit_node_type("model")
assert not config.entities_enabled.can_emit_node_type("source")
assert not config.entities_enabled.can_emit_node_type("test")
assert config.entities_enabled.can_emit_test_results
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {
"test_results": "Yes",
"test_definitions": "Yes",
"models": "No",
"sources": "No",
},
}
config = DBTConfig.parse_obj(config_dict)
assert not config.entities_enabled.can_emit_node_type("model")
assert not config.entities_enabled.can_emit_node_type("source")
assert config.entities_enabled.can_emit_node_type("test")
assert config.entities_enabled.can_emit_test_results