mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-20 06:58:18 +00:00
Fix 6874: Added Support for lineage from dbt ephemeral nodes (#12101)
* fixed dbt ephemeral nodes lin * fixed dbt tests
This commit is contained in:
parent
258a2e33e2
commit
d9d3f6895b
@ -0,0 +1,88 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Constants required for dbt
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
# Based on https://schemas.getdbt.com/dbt/manifest/v7/index.html
|
||||
REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"]
|
||||
|
||||
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
|
||||
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]
|
||||
|
||||
NONE_KEYWORDS_LIST = ["none", "null"]
|
||||
|
||||
DBT_CATALOG_FILE_NAME = "catalog.json"
|
||||
DBT_MANIFEST_FILE_NAME = "manifest.json"
|
||||
DBT_RUN_RESULTS_FILE_NAME = "run_results.json"
|
||||
|
||||
|
||||
class SkipResourceTypeEnum(Enum):
|
||||
"""
|
||||
Enum for nodes to be skipped
|
||||
"""
|
||||
|
||||
ANALYSIS = "analysis"
|
||||
TEST = "test"
|
||||
|
||||
|
||||
class CompiledQueriesEnum(Enum):
|
||||
"""
|
||||
Enum for Compiled Queries
|
||||
"""
|
||||
|
||||
COMPILED_CODE = "compiled_code"
|
||||
COMPILED_SQL = "compiled_sql"
|
||||
|
||||
|
||||
class RawQueriesEnum(Enum):
|
||||
"""
|
||||
Enum for Raw Queries
|
||||
"""
|
||||
|
||||
RAW_CODE = "raw_code"
|
||||
RAW_SQL = "raw_sql"
|
||||
|
||||
|
||||
class DbtTestSuccessEnum(Enum):
|
||||
"""
|
||||
Enum for success messages of dbt tests
|
||||
"""
|
||||
|
||||
SUCCESS = "success"
|
||||
PASS = "pass"
|
||||
|
||||
|
||||
class DbtTestFailureEnum(Enum):
|
||||
"""
|
||||
Enum for failure message of dbt tests
|
||||
"""
|
||||
|
||||
FAILURE = "failure"
|
||||
FAIL = "fail"
|
||||
|
||||
|
||||
class DbtCommonEnum(Enum):
|
||||
"""
|
||||
Common enum for dbt
|
||||
"""
|
||||
|
||||
OWNER = "owner"
|
||||
NODES = "nodes"
|
||||
SOURCES = "sources"
|
||||
RESOURCETYPE = "resource_type"
|
||||
MANIFEST_NODE = "manifest_node"
|
||||
UPSTREAM = "upstream"
|
||||
RESULTS = "results"
|
||||
TEST_SUITE_NAME = "test_suite_name"
|
||||
DBT_TEST_SUITE = "DBT_TEST_SUITE"
|
@ -0,0 +1,180 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
DBT utils methods.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Optional, Union
|
||||
|
||||
from metadata.generated.schema.tests.testSuite import TestSuite
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.database.dbt.constants import (
|
||||
NONE_KEYWORDS_LIST,
|
||||
CompiledQueriesEnum,
|
||||
DbtCommonEnum,
|
||||
RawQueriesEnum,
|
||||
)
|
||||
from metadata.utils import entity_link
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
def create_test_case_parameter_definitions(dbt_test):
|
||||
"""
|
||||
Method to create test case parameter definitions
|
||||
"""
|
||||
try:
|
||||
if hasattr(dbt_test, "test_metadata"):
|
||||
test_case_param_definition = [
|
||||
{
|
||||
"name": dbt_test.test_metadata.name,
|
||||
"displayName": dbt_test.test_metadata.name,
|
||||
"required": False,
|
||||
}
|
||||
]
|
||||
return test_case_param_definition
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to capture tests case parameter definitions for node: {dbt_test} {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def create_test_case_parameter_values(dbt_test):
|
||||
"""
|
||||
Method to create test case parameter values
|
||||
"""
|
||||
try:
|
||||
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
|
||||
if hasattr(manifest_node, "test_metadata"):
|
||||
values = manifest_node.test_metadata.kwargs.get("values")
|
||||
dbt_test_values = ""
|
||||
if values:
|
||||
dbt_test_values = ",".join(str(value) for value in values)
|
||||
test_case_param_values = [
|
||||
{"name": manifest_node.test_metadata.name, "value": dbt_test_values}
|
||||
]
|
||||
return test_case_param_values
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to capture tests case parameter values for node: {dbt_test} {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def generate_entity_link(dbt_test):
|
||||
"""
|
||||
Method returns entity link
|
||||
"""
|
||||
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
|
||||
entity_link_list = [
|
||||
entity_link.get_entity_link(
|
||||
table_fqn=table_fqn,
|
||||
column_name=manifest_node.column_name
|
||||
if hasattr(manifest_node, "column_name")
|
||||
else None,
|
||||
)
|
||||
for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value]
|
||||
]
|
||||
return entity_link_list
|
||||
|
||||
|
||||
def get_dbt_compiled_query(mnode) -> Optional[str]:
|
||||
"""
|
||||
Method to get dbt compiled query
|
||||
"""
|
||||
if hasattr(mnode, CompiledQueriesEnum.COMPILED_CODE.value) and mnode.compiled_code:
|
||||
return mnode.compiled_code
|
||||
if hasattr(mnode, CompiledQueriesEnum.COMPILED_SQL.value) and mnode.compiled_sql:
|
||||
return mnode.compiled_sql
|
||||
logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}")
|
||||
return None
|
||||
|
||||
|
||||
def get_dbt_raw_query(mnode) -> Optional[str]:
|
||||
"""
|
||||
Method to get dbt raw query
|
||||
"""
|
||||
if hasattr(mnode, RawQueriesEnum.RAW_CODE.value) and mnode.raw_code:
|
||||
return mnode.raw_code
|
||||
if hasattr(mnode, RawQueriesEnum.RAW_SQL.value) and mnode.raw_sql:
|
||||
return mnode.raw_sql
|
||||
logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}")
|
||||
return None
|
||||
|
||||
|
||||
def check_or_create_test_suite(
|
||||
metadata: OpenMetadata, test_entity_link: str
|
||||
) -> Union[TestSuite, EntityReference]:
|
||||
"""Check if test suite exists, if not create it
|
||||
|
||||
Args:
|
||||
entity_link (str): entity link
|
||||
|
||||
Returns:
|
||||
TestSuite:
|
||||
"""
|
||||
table_fqn = entity_link.get_table_fqn(test_entity_link)
|
||||
return metadata.get_or_create_executable_test_suite(table_fqn)
|
||||
|
||||
|
||||
def check_ephemeral_node(manifest_node) -> bool:
|
||||
"""
|
||||
Check if the manifest node is an ephemeral node
|
||||
"""
|
||||
if (
|
||||
hasattr(manifest_node, "config")
|
||||
and manifest_node.config
|
||||
and hasattr(manifest_node.config, "materialized")
|
||||
and manifest_node.config.materialized == "ephemeral"
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_dbt_model_name(manifest_node) -> str:
|
||||
"""
|
||||
Get the alias or name of the manifest node
|
||||
"""
|
||||
return (
|
||||
manifest_node.alias
|
||||
if hasattr(manifest_node, "alias") and manifest_node.alias
|
||||
else manifest_node.name
|
||||
)
|
||||
|
||||
|
||||
def get_corrected_name(name: Optional[str]):
|
||||
"""
|
||||
Method to fetch correct name
|
||||
"""
|
||||
correct_name = None
|
||||
if name:
|
||||
correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name
|
||||
return correct_name
|
||||
|
||||
|
||||
def get_data_model_path(manifest_node):
|
||||
"""
|
||||
Method to get data model path
|
||||
"""
|
||||
datamodel_path = None
|
||||
if manifest_node.original_file_path:
|
||||
if hasattr(manifest_node, "root_path") and manifest_node.root_path:
|
||||
datamodel_path = (
|
||||
f"{manifest_node.root_path}/{manifest_node.original_file_path}"
|
||||
)
|
||||
else:
|
||||
datamodel_path = manifest_node.original_file_path
|
||||
return datamodel_path
|
@ -13,7 +13,6 @@ DBT source methods.
|
||||
"""
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Iterable, List, Optional, Union
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
@ -50,7 +49,6 @@ from metadata.generated.schema.tests.testDefinition import (
|
||||
TestDefinition,
|
||||
TestPlatform,
|
||||
)
|
||||
from metadata.generated.schema.tests.testSuite import TestSuite
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
@ -60,87 +58,38 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
|
||||
from metadata.ingestion.source.database.database_service import DataModelLink
|
||||
from metadata.ingestion.source.database.dbt.constants import (
|
||||
REQUIRED_CATALOG_KEYS,
|
||||
REQUIRED_MANIFEST_KEYS,
|
||||
DbtCommonEnum,
|
||||
DbtTestFailureEnum,
|
||||
DbtTestSuccessEnum,
|
||||
SkipResourceTypeEnum,
|
||||
)
|
||||
from metadata.ingestion.source.database.dbt.dbt_service import (
|
||||
DbtFiles,
|
||||
DbtObjects,
|
||||
DbtServiceSource,
|
||||
)
|
||||
from metadata.utils import entity_link, fqn
|
||||
from metadata.ingestion.source.database.dbt.dbt_utils import (
|
||||
check_ephemeral_node,
|
||||
check_or_create_test_suite,
|
||||
create_test_case_parameter_definitions,
|
||||
create_test_case_parameter_values,
|
||||
generate_entity_link,
|
||||
get_corrected_name,
|
||||
get_data_model_path,
|
||||
get_dbt_compiled_query,
|
||||
get_dbt_model_name,
|
||||
get_dbt_raw_query,
|
||||
)
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.elasticsearch import get_entity_from_es_result
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
# Based on https://schemas.getdbt.com/dbt/manifest/v7/index.html
|
||||
REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"]
|
||||
|
||||
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
|
||||
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]
|
||||
|
||||
NONE_KEYWORDS_LIST = ["none", "null"]
|
||||
|
||||
|
||||
class SkipResourceTypeEnum(Enum):
|
||||
"""
|
||||
Enum for nodes to be skipped
|
||||
"""
|
||||
|
||||
ANALYSIS = "analysis"
|
||||
TEST = "test"
|
||||
|
||||
|
||||
class CompiledQueriesEnum(Enum):
|
||||
"""
|
||||
Enum for Compiled Queries
|
||||
"""
|
||||
|
||||
COMPILED_CODE = "compiled_code"
|
||||
COMPILED_SQL = "compiled_sql"
|
||||
|
||||
|
||||
class RawQueriesEnum(Enum):
|
||||
"""
|
||||
Enum for Raw Queries
|
||||
"""
|
||||
|
||||
RAW_CODE = "raw_code"
|
||||
RAW_SQL = "raw_sql"
|
||||
|
||||
|
||||
class DbtTestSuccessEnum(Enum):
|
||||
"""
|
||||
Enum for success messages of dbt tests
|
||||
"""
|
||||
|
||||
SUCCESS = "success"
|
||||
PASS = "pass"
|
||||
|
||||
|
||||
class DbtTestFailureEnum(Enum):
|
||||
"""
|
||||
Enum for failure message of dbt tests
|
||||
"""
|
||||
|
||||
FAILURE = "failure"
|
||||
FAIL = "fail"
|
||||
|
||||
|
||||
class DbtCommonEnum(Enum):
|
||||
"""
|
||||
Common enum for dbt
|
||||
"""
|
||||
|
||||
OWNER = "owner"
|
||||
NODES = "nodes"
|
||||
SOURCES = "sources"
|
||||
RESOURCETYPE = "resource_type"
|
||||
MANIFEST_NODE = "manifest_node"
|
||||
UPSTREAM = "upstream"
|
||||
RESULTS = "results"
|
||||
TEST_SUITE_NAME = "test_suite_name"
|
||||
DBT_TEST_SUITE = "DBT_TEST_SUITE"
|
||||
|
||||
|
||||
class InvalidServiceException(Exception):
|
||||
"""
|
||||
@ -148,7 +97,7 @@ class InvalidServiceException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
class DbtSource(DbtServiceSource):
|
||||
"""
|
||||
Class defines method to extract metadata from DBT
|
||||
"""
|
||||
@ -397,6 +346,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
)
|
||||
continue
|
||||
|
||||
# Skip the ephemeral nodes since it is not materialized
|
||||
if check_ephemeral_node(manifest_node):
|
||||
logger.debug(f"Skipping ephemeral DBT node: {key}.")
|
||||
continue
|
||||
|
||||
# Skip the analysis and test nodes
|
||||
if manifest_node.resource_type.value in [
|
||||
item.value for item in SkipResourceTypeEnum
|
||||
@ -404,11 +358,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
logger.debug(f"Skipping DBT node: {key}.")
|
||||
continue
|
||||
|
||||
model_name = (
|
||||
manifest_node.alias
|
||||
if hasattr(manifest_node, "alias") and manifest_node.alias
|
||||
else manifest_node.name
|
||||
)
|
||||
model_name = get_dbt_model_name(manifest_node)
|
||||
logger.debug(f"Processing DBT node: {model_name}")
|
||||
|
||||
catalog_node = None
|
||||
@ -424,8 +374,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
include_tags=self.source_config.includeTags,
|
||||
)
|
||||
|
||||
dbt_compiled_query = self.get_dbt_compiled_query(manifest_node)
|
||||
dbt_raw_query = self.get_dbt_raw_query(manifest_node)
|
||||
dbt_compiled_query = get_dbt_compiled_query(manifest_node)
|
||||
dbt_raw_query = get_dbt_raw_query(manifest_node)
|
||||
|
||||
# Get the table entity from ES
|
||||
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
|
||||
@ -433,8 +383,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.config.serviceName,
|
||||
database_name=self.get_corrected_name(manifest_node.database),
|
||||
schema_name=self.get_corrected_name(manifest_node.schema_),
|
||||
database_name=get_corrected_name(manifest_node.database),
|
||||
schema_name=get_corrected_name(manifest_node.schema_),
|
||||
table_name=model_name,
|
||||
)
|
||||
table_entity: Optional[
|
||||
@ -454,9 +404,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
description=manifest_node.description
|
||||
if manifest_node.description
|
||||
else None,
|
||||
path=self.get_data_model_path(
|
||||
manifest_node=manifest_node
|
||||
),
|
||||
path=get_data_model_path(manifest_node=manifest_node),
|
||||
rawSql=dbt_raw_query if dbt_raw_query else "",
|
||||
sql=dbt_compiled_query if dbt_compiled_query else "",
|
||||
columns=self.parse_data_model_columns(
|
||||
@ -486,23 +434,6 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
f"Unexpected exception parsing DBT node:{model_name} - {exc}"
|
||||
)
|
||||
|
||||
def get_corrected_name(self, name: Optional[str]):
|
||||
correct_name = None
|
||||
if name:
|
||||
correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name
|
||||
return correct_name
|
||||
|
||||
def get_data_model_path(self, manifest_node):
|
||||
datamodel_path = None
|
||||
if manifest_node.original_file_path:
|
||||
if hasattr(manifest_node, "root_path") and manifest_node.root_path:
|
||||
datamodel_path = (
|
||||
f"{manifest_node.root_path}/{manifest_node.original_file_path}"
|
||||
)
|
||||
else:
|
||||
datamodel_path = manifest_node.original_file_path
|
||||
return datamodel_path
|
||||
|
||||
def parse_upstream_nodes(self, manifest_entities, dbt_node):
|
||||
"""
|
||||
Method to fetch the upstream nodes
|
||||
@ -517,32 +448,35 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
for node in dbt_node.depends_on.nodes:
|
||||
try:
|
||||
parent_node = manifest_entities[node]
|
||||
table_name = (
|
||||
parent_node.alias
|
||||
if hasattr(parent_node, "alias") and parent_node.alias
|
||||
else parent_node.name
|
||||
)
|
||||
parent_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.config.serviceName,
|
||||
database_name=self.get_corrected_name(parent_node.database),
|
||||
schema_name=self.get_corrected_name(parent_node.schema_),
|
||||
table_name=table_name,
|
||||
)
|
||||
|
||||
# check if the parent table exists in OM before adding it to the upstream list
|
||||
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
|
||||
parent_table_entity: Optional[
|
||||
Union[Table, List[Table]]
|
||||
] = get_entity_from_es_result(
|
||||
entity_list=self.metadata.es_search_from_fqn(
|
||||
entity_type=Table, fqn_search_string=parent_fqn
|
||||
),
|
||||
fetch_multiple_entities=False,
|
||||
)
|
||||
if parent_table_entity:
|
||||
upstream_nodes.append(parent_fqn)
|
||||
# check if the node is an ephemeral node
|
||||
# Recursively store the upstream of the ephemeral node in the upstream list
|
||||
if check_ephemeral_node(parent_node):
|
||||
upstream_nodes.extend(
|
||||
self.parse_upstream_nodes(manifest_entities, parent_node)
|
||||
)
|
||||
else:
|
||||
parent_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.config.serviceName,
|
||||
database_name=get_corrected_name(parent_node.database),
|
||||
schema_name=get_corrected_name(parent_node.schema_),
|
||||
table_name=get_dbt_model_name(parent_node),
|
||||
)
|
||||
|
||||
# check if the parent table exists in OM before adding it to the upstream list
|
||||
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
|
||||
parent_table_entity: Optional[
|
||||
Union[Table, List[Table]]
|
||||
] = get_entity_from_es_result(
|
||||
entity_list=self.metadata.es_search_from_fqn(
|
||||
entity_type=Table, fqn_search_string=parent_fqn
|
||||
),
|
||||
fetch_multiple_entities=False,
|
||||
)
|
||||
if parent_table_entity:
|
||||
upstream_nodes.append(parent_fqn)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(
|
||||
@ -763,7 +697,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
description=manifest_node.description,
|
||||
entityType=entity_type,
|
||||
testPlatforms=[TestPlatform.DBT],
|
||||
parameterDefinition=self.create_test_case_parameter_definitions(
|
||||
parameterDefinition=create_test_case_parameter_definitions(
|
||||
manifest_node
|
||||
),
|
||||
displayName=None,
|
||||
@ -781,9 +715,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
|
||||
if manifest_node:
|
||||
logger.debug(f"Processing DBT Test Case for node: {manifest_node.name}")
|
||||
entity_link_list = self.generate_entity_link(dbt_test)
|
||||
entity_link_list = generate_entity_link(dbt_test)
|
||||
for entity_link_str in entity_link_list:
|
||||
test_suite = self._check_or_create_test_suite(entity_link_str)
|
||||
test_suite = check_or_create_test_suite(
|
||||
self.metadata, entity_link_str
|
||||
)
|
||||
yield CreateTestCaseRequest(
|
||||
name=manifest_node.name,
|
||||
description=manifest_node.description,
|
||||
@ -792,9 +728,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
),
|
||||
entityLink=entity_link_str,
|
||||
testSuite=test_suite.fullyQualifiedName,
|
||||
parameterValues=self.create_test_case_parameter_values(
|
||||
dbt_test
|
||||
),
|
||||
parameterValues=create_test_case_parameter_values(dbt_test),
|
||||
displayName=None,
|
||||
owner=None,
|
||||
)
|
||||
@ -890,94 +824,5 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
|
||||
f"Failed to capture tests results for node: {manifest_node.name} {err}"
|
||||
)
|
||||
|
||||
def create_test_case_parameter_definitions(self, dbt_test):
|
||||
try:
|
||||
if hasattr(dbt_test, "test_metadata"):
|
||||
test_case_param_definition = [
|
||||
{
|
||||
"name": dbt_test.test_metadata.name,
|
||||
"displayName": dbt_test.test_metadata.name,
|
||||
"required": False,
|
||||
}
|
||||
]
|
||||
return test_case_param_definition
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to capture tests case parameter definitions for node: {dbt_test} {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
def create_test_case_parameter_values(self, dbt_test):
|
||||
try:
|
||||
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
|
||||
if hasattr(manifest_node, "test_metadata"):
|
||||
values = manifest_node.test_metadata.kwargs.get("values")
|
||||
dbt_test_values = ""
|
||||
if values:
|
||||
dbt_test_values = ",".join(str(value) for value in values)
|
||||
test_case_param_values = [
|
||||
{"name": manifest_node.test_metadata.name, "value": dbt_test_values}
|
||||
]
|
||||
return test_case_param_values
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to capture tests case parameter values for node: {dbt_test} {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
def generate_entity_link(self, dbt_test):
|
||||
"""
|
||||
Method returns entity link
|
||||
"""
|
||||
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
|
||||
entity_link_list = [
|
||||
entity_link.get_entity_link(
|
||||
table_fqn=table_fqn,
|
||||
column_name=manifest_node.column_name
|
||||
if hasattr(manifest_node, "column_name")
|
||||
else None,
|
||||
)
|
||||
for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value]
|
||||
]
|
||||
return entity_link_list
|
||||
|
||||
def get_dbt_compiled_query(self, mnode) -> Optional[str]:
|
||||
if (
|
||||
hasattr(mnode, CompiledQueriesEnum.COMPILED_CODE.value)
|
||||
and mnode.compiled_code
|
||||
):
|
||||
return mnode.compiled_code
|
||||
if (
|
||||
hasattr(mnode, CompiledQueriesEnum.COMPILED_SQL.value)
|
||||
and mnode.compiled_sql
|
||||
):
|
||||
return mnode.compiled_sql
|
||||
logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}")
|
||||
return None
|
||||
|
||||
def get_dbt_raw_query(self, mnode) -> Optional[str]:
|
||||
if hasattr(mnode, RawQueriesEnum.RAW_CODE.value) and mnode.raw_code:
|
||||
return mnode.raw_code
|
||||
if hasattr(mnode, RawQueriesEnum.RAW_SQL.value) and mnode.raw_sql:
|
||||
return mnode.raw_sql
|
||||
logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}")
|
||||
return None
|
||||
|
||||
def _check_or_create_test_suite(
|
||||
self, test_entity_link: str
|
||||
) -> Union[TestSuite, EntityReference]:
|
||||
"""Check if test suite exists, if not create it
|
||||
|
||||
Args:
|
||||
entity_link (str): entity link
|
||||
|
||||
Returns:
|
||||
TestSuite:
|
||||
"""
|
||||
table_fqn = entity_link.get_table_fqn(test_entity_link)
|
||||
return self.metadata.get_or_create_executable_test_suite(table_fqn)
|
||||
|
||||
def close(self):
|
||||
self.metadata.close()
|
||||
|
@ -24,6 +24,13 @@ from metadata.generated.schema.type.tagLabel import (
|
||||
TagSource,
|
||||
)
|
||||
from metadata.ingestion.source.database.database_service import DataModelLink
|
||||
from metadata.ingestion.source.database.dbt.dbt_utils import (
|
||||
generate_entity_link,
|
||||
get_corrected_name,
|
||||
get_data_model_path,
|
||||
get_dbt_compiled_query,
|
||||
get_dbt_raw_query,
|
||||
)
|
||||
from metadata.ingestion.source.database.dbt.metadata import DbtSource
|
||||
from metadata.utils.dbt_config import DbtFiles, DbtObjects
|
||||
from metadata.utils.tag_utils import get_tag_labels
|
||||
@ -332,12 +339,10 @@ class DbtUnitTest(TestCase):
|
||||
)
|
||||
|
||||
def test_dbt_get_corrected_name(self):
|
||||
self.assertEqual(
|
||||
"dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle")
|
||||
)
|
||||
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None"))
|
||||
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null"))
|
||||
self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev"))
|
||||
self.assertEqual("dbt_jaffle", get_corrected_name(name="dbt_jaffle"))
|
||||
self.assertIsNone(get_corrected_name(name="None"))
|
||||
self.assertIsNone(get_corrected_name(name="null"))
|
||||
self.assertIsNotNone(get_corrected_name(name="dev"))
|
||||
|
||||
@patch("metadata.utils.tag_utils.get_tag_label")
|
||||
def test_dbt_get_dbt_tag_labels(self, get_tag_label):
|
||||
@ -378,7 +383,7 @@ class DbtUnitTest(TestCase):
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"model.jaffle_shop.customers"
|
||||
)
|
||||
result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node)
|
||||
result = get_data_model_path(manifest_node=manifest_node)
|
||||
self.assertEqual("sample/customers/root/path/models/customers.sql", result)
|
||||
|
||||
def test_dbt_generate_entity_link(self):
|
||||
@ -393,7 +398,7 @@ class DbtUnitTest(TestCase):
|
||||
"upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"],
|
||||
"results": "",
|
||||
}
|
||||
result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test)
|
||||
result = generate_entity_link(dbt_test=dbt_test)
|
||||
self.assertListEqual(
|
||||
[
|
||||
"<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>"
|
||||
@ -411,7 +416,7 @@ class DbtUnitTest(TestCase):
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"model.jaffle_shop.customers"
|
||||
)
|
||||
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
|
||||
result = get_dbt_compiled_query(mnode=manifest_node)
|
||||
self.assertEqual(expected_query, result)
|
||||
|
||||
# Test the compiled queries with v4 v5 v6 manifest
|
||||
@ -421,7 +426,7 @@ class DbtUnitTest(TestCase):
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"model.jaffle_shop.customers"
|
||||
)
|
||||
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
|
||||
result = get_dbt_compiled_query(mnode=manifest_node)
|
||||
self.assertEqual(expected_query, result)
|
||||
|
||||
def test_dbt_raw_query(self):
|
||||
@ -434,7 +439,7 @@ class DbtUnitTest(TestCase):
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"model.jaffle_shop.customers"
|
||||
)
|
||||
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
|
||||
result = get_dbt_raw_query(mnode=manifest_node)
|
||||
self.assertEqual(expected_query, result)
|
||||
|
||||
# Test the raw queries with v4 v5 v6 manifest
|
||||
@ -444,7 +449,7 @@ class DbtUnitTest(TestCase):
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"model.jaffle_shop.customers"
|
||||
)
|
||||
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
|
||||
result = get_dbt_raw_query(mnode=manifest_node)
|
||||
self.assertEqual(expected_query, result)
|
||||
|
||||
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
|
||||
|
Loading…
x
Reference in New Issue
Block a user