512 lines
18 KiB
Python
Raw Normal View History

"""
Test dbt
"""
import json
import uuid
from pathlib import Path
from unittest import TestCase
from unittest.mock import MagicMock, patch
from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from pydantic import AnyUrl
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.utils import tag_utils
from metadata.utils.dbt_config import DbtFiles, DbtObjects
mock_dbt_config = {
"source": {
"type": "dbt",
"serviceName": "dbt_test",
"sourceConfig": {
"config": {
"type": "DBT",
"dbtConfigSource": {
"dbtCatalogFilePath": "sample/dbt_files/catalog.json",
"dbtManifestFilePath": "sample/dbt_files/manifest.json",
"dbtRunResultsFilePath": "sample/dbt_files/run_results.json",
},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
MOCK_SAMPLE_MANIFEST_V4_V5_V6 = "resources/datasets/manifest_v4_v5_v6.json"
MOCK_SAMPLE_MANIFEST_V7 = "resources/datasets/manifest_v7.json"
MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json"
MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json"
MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json"
EXPECTED_DATA_MODEL_FQNS = [
"dbt_test.dev.dbt_jaffle.customers",
"dbt_test.dev.dbt_jaffle.orders",
"dbt_test.dev.dbt_jaffle.stg_customers",
"dbt_test.dev.dbt_jaffle.customers_null_db",
]
EXPECTED_DATA_MODELS = [
DataModel(
modelType="DBT",
description="This table has basic information about a customer, as well as some derived facts based on a customer's orders",
path="sample/customers/root/path/models/customers.sql",
rawSql="sample customers raw code",
sql="sample customers compile code",
upstream=[],
owner=EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
name=None,
fullyQualifiedName="aaron_johnson0",
description=None,
displayName=None,
deleted=None,
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
),
),
tags=[
TagLabel(
tagFQN="dbtTags.model_tag_one",
description=None,
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
),
TagLabel(
tagFQN="dbtTags.model_tag_two",
description=None,
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
),
],
columns=[
Column(
name="customer_id",
dataType="UNKNOWN",
dataLength=1,
description="This is a unique identifier for a customer",
),
Column(
name="first_name",
dataType="UNKNOWN",
dataLength=1,
description="Customer's first name. PII.",
),
Column(
name="last_name",
dataType="UNKNOWN",
dataLength=1,
description="Customer's last name. PII.",
),
],
generatedAt=None,
)
]
EXPECTED_DATA_MODEL_NULL_DB = [
DataModel(
modelType="DBT",
description=None,
path="sample/customers_null_db/root/path/models/staging/customers_null_db.sql",
rawSql="sample customers_null_db raw_code",
sql="sample customers_null_db compiled code",
upstream=[],
owner=EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
name=None,
fullyQualifiedName="aaron_johnson0",
description=None,
displayName=None,
deleted=None,
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
),
),
tags=None,
columns=[
Column(
name="customer_id",
displayName=None,
dataType="UNKNOWN",
dataLength=1,
description="This is a unique identifier for an customer",
)
],
generatedAt=None,
),
]
MOCK_OWNER = EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
name=None,
fullyQualifiedName="aaron_johnson0",
description=None,
displayName=None,
deleted=None,
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
),
)
MOCK_USER = [
User(
id=uuid.uuid4(),
name="aaron_johnson0",
email="aaron_johnson0@gmail.com",
href="http://localhost:8585/api/v1/users/d96eccb9-9a9b-40ad-9585-0a8a71665c51",
fullyQualifiedName="aaron_johnson0",
)
]
MOCK_TABLE_ENTITIES = [
Table(
id=uuid.uuid4(),
name="customers",
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers",
columns=[],
)
]
MOCK_NULL_DB_TABLE = [
Table(
id=uuid.uuid4(),
name="customers_null_db",
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers_null_db",
columns=[],
),
]
MOCK_TAG_LABELS = [
TagLabel(
tagFQN="dbtTags.tag1",
labelType=LabelType.Automated,
state=State.Suggested,
source=TagSource.Classification,
),
TagLabel(
tagFQN='dbtTags."tag2.name"',
labelType=LabelType.Automated,
state=State.Suggested,
source=TagSource.Classification,
),
TagLabel(
tagFQN="dbtTags.tag3",
labelType=LabelType.Automated,
state=State.Suggested,
source=TagSource.Classification,
),
]
class DbtUnitTest(TestCase):
"""
Implements the necessary methods to extract
dbt Unit Test
"""
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.test_connection")
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_dbt_config)
self.dbt_source_obj = DbtSource.create(
mock_dbt_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v4_v5_v6(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES
self.execute_test(
MOCK_SAMPLE_MANIFEST_V4_V5_V6,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v7(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES
self.execute_test(
MOCK_SAMPLE_MANIFEST_V7,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_manifest_v8(self, _get_tag_label, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
_get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.model_tag_one",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN="dbtTags.model_tag_two",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
]
self.execute_test(
MOCK_SAMPLE_MANIFEST_V8,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_null_db(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_NULL_DB_TABLE
self.execute_test(
MOCK_SAMPLE_MANIFEST_NULL_DB,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODEL_NULL_DB,
)
def test_dbt_get_corrected_name(self):
self.assertEqual(
"dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle")
)
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None"))
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null"))
self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev"))
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_get_dbt_tag_labels(self, _get_tag_label):
_get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.tag1",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN='dbtTags."tag2.name"',
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN="dbtTags.tag3",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
]
mocked_metadata = MagicMock()
result = tag_utils.get_tag_labels(
metadata=mocked_metadata,
classification_name="dbtTags",
tags=["tag1", "tag2.name", "tag3"],
include_tags=True,
)
self.assertListEqual(result, MOCK_TAG_LABELS)
def test_dbt_get_data_model_path(self):
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node)
self.assertEqual("sample/customers/root/path/models/customers.sql", result)
def test_dbt_generate_entity_link(self):
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"test.jaffle_shop.unique_orders_order_id.fed79b3a6e"
)
dbt_test = {
"manifest_node": manifest_node,
"upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"],
"results": "",
}
result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test)
self.assertListEqual(
[
"<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>"
],
result,
)
def test_dbt_compiled_query(self):
expected_query = "sample customers compile code"
# Test the compiled queries with v8 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
# Test the compiled queries with v4 v5 v6 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
def test_dbt_raw_query(self):
expected_query = "sample customers raw code"
# Test the raw queries with v8 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
# Test the raw queries with v4 v5 v6 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_owner(self, es_search_from_fqn):
es_search_from_fqn.return_value = MOCK_USER
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_owner(
manifest_node=manifest_node, catalog_node=None
)
self.assertEqual("aaron_johnson0", result.fullyQualifiedName)
def execute_test(self, mock_manifest, expected_records, expected_data_models):
dbt_files, dbt_objects = self.get_dbt_object_files(mock_manifest)
self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records)
self.check_yield_datamodel(
dbt_objects=dbt_objects, expected_data_models=expected_data_models
)
def get_dbt_object_files(self, mock_manifest):
mock_file_path = Path(__file__).parent / mock_manifest
with open(mock_file_path) as file:
mock_data: dict = json.load(file)
dbt_files = DbtFiles(dbt_manifest=mock_data)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(dbt_files.dbt_catalog)
if dbt_files.dbt_catalog
else None,
dbt_manifest=parse_manifest(dbt_files.dbt_manifest),
dbt_run_results=parse_run_results(dbt_files.dbt_run_results)
if dbt_files.dbt_run_results
else None,
)
return dbt_files, dbt_objects
def check_dbt_validate(self, dbt_files, expected_records):
with self.assertLogs(level="DEBUG") as captured:
self.dbt_source_obj.validate_dbt_files(dbt_files=dbt_files)
self.assertEqual(len(captured.records), expected_records)
for record in captured.records:
self.assertNotIn("Error", record.getMessage())
self.assertNotIn("Unable", record.getMessage())
def check_yield_datamodel(self, dbt_objects, expected_data_models):
data_model_list = []
yield_data_models = self.dbt_source_obj.yield_data_models(
dbt_objects=dbt_objects
)
for data_model_link in yield_data_models:
if isinstance(data_model_link, DataModelLink):
self.assertIn(
data_model_link.table_entity.fullyQualifiedName.__root__,
EXPECTED_DATA_MODEL_FQNS,
)
data_model_list.append(data_model_link.datamodel)
for _, (exptected, original) in enumerate(
zip(expected_data_models, data_model_list)
):
self.assertEqual(exptected, original)