""" Test dbt """ import json import uuid from pathlib import Path from unittest import TestCase from unittest.mock import patch from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results from pydantic import AnyUrl from metadata.generated.schema.entity.data.table import Column, DataModel, Table from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tagLabel import ( LabelType, State, TagLabel, TagSource, ) from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.dbt.metadata import DbtSource from metadata.utils.dbt_config import DbtFiles, DbtObjects mock_dbt_config = { "source": { "type": "dbt", "serviceName": "dbt_test", "sourceConfig": { "config": { "type": "DBT", "dbtConfigSource": { "dbtCatalogFilePath": "sample/dbt_files/catalog.json", "dbtManifestFilePath": "sample/dbt_files/manifest.json", "dbtRunResultsFilePath": "sample/dbt_files/run_results.json", }, } }, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { "openMetadataServerConfig": { "hostPort": "http://localhost:8585/api", "authProvider": "openmetadata", "securityConfig": { "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" }, } }, } MOCK_SAMPLE_MANIFEST_V4_V5_V6 = "resources/datasets/manifest_v4_v5_v6.json" MOCK_SAMPLE_MANIFEST_V7 = "resources/datasets/manifest_v7.json" MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json" MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json" MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json" EXPECTED_DATA_MODEL_FQNS = [ "dbt_test.dev.dbt_jaffle.customers", "dbt_test.dev.dbt_jaffle.orders", "dbt_test.dev.dbt_jaffle.stg_customers", "dbt_test.dev.dbt_jaffle.customers_null_db", ] EXPECTED_DATA_MODELS = [ DataModel( modelType="DBT", description="This table has basic information about a customer, as well as some derived facts based on a customer's orders", path="sample/customers/root/path/models/customers.sql", rawSql="sample customers raw code", sql="sample customers compile code", upstream=[], owner=EntityReference( id="cb2a92f5-e935-4ad7-911c-654280046538", type="user", name=None, fullyQualifiedName="aaron_johnson0", description=None, displayName=None, deleted=None, href=AnyUrl( "http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", scheme="http", host="localhost", host_type="int_domain", port="8585", path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", ), ), tags=[ TagLabel( tagFQN="dbtTags.model_tag_one", description=None, source="Classification", labelType="Automated", state="Confirmed", href=None, ), TagLabel( tagFQN="dbtTags.model_tag_two", description=None, source="Classification", labelType="Automated", state="Confirmed", href=None, ), ], columns=[ Column( name="customer_id", dataType="UNKNOWN", dataLength=1, description="This is a unique identifier for a customer", ), Column( name="first_name", dataType="UNKNOWN", dataLength=1, description="Customer's first name. PII.", ), Column( name="last_name", dataType="UNKNOWN", dataLength=1, description="Customer's last name. PII.", ), ], generatedAt=None, ) ] EXPECTED_DATA_MODEL_NULL_DB = [ DataModel( modelType="DBT", description=None, path="sample/customers_null_db/root/path/models/staging/customers_null_db.sql", rawSql="sample customers_null_db raw_code", sql="sample customers_null_db compiled code", upstream=[], owner=EntityReference( id="cb2a92f5-e935-4ad7-911c-654280046538", type="user", name=None, fullyQualifiedName="aaron_johnson0", description=None, displayName=None, deleted=None, href=AnyUrl( "http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", scheme="http", host="localhost", host_type="int_domain", port="8585", path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", ), ), tags=None, columns=[ Column( name="customer_id", displayName=None, dataType="UNKNOWN", dataLength=1, description="This is a unique identifier for an customer", ) ], generatedAt=None, ), ] MOCK_OWNER = EntityReference( id="cb2a92f5-e935-4ad7-911c-654280046538", type="user", name=None, fullyQualifiedName="aaron_johnson0", description=None, displayName=None, deleted=None, href=AnyUrl( "http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", scheme="http", host="localhost", host_type="int_domain", port="8585", path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", ), ) MOCK_USER = [ User( id=uuid.uuid4(), name="aaron_johnson0", email="aaron_johnson0@gmail.com", href="http://localhost:8585/api/v1/users/d96eccb9-9a9b-40ad-9585-0a8a71665c51", fullyQualifiedName="aaron_johnson0", ) ] MOCK_TABLE_ENTITIES = [ Table( id=uuid.uuid4(), name="customers", databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers", columns=[], ) ] MOCK_NULL_DB_TABLE = [ Table( id=uuid.uuid4(), name="customers_null_db", databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers_null_db", columns=[], ), ] MOCK_TAG_LABELS = [ TagLabel( tagFQN="dbtTags.tag1", labelType=LabelType.Automated, state=State.Confirmed, source=TagSource.Classification, ), TagLabel( tagFQN="dbtTags.tag2name", labelType=LabelType.Automated, state=State.Confirmed, source=TagSource.Classification, ), TagLabel( tagFQN="dbtTags.tag3", labelType=LabelType.Automated, state=State.Confirmed, source=TagSource.Classification, ), ] class DbtUnitTest(TestCase): """ Implements the necessary methods to extract dbt Unit Test """ @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.test_connection") def __init__(self, methodName, test_connection) -> None: super().__init__(methodName) test_connection.return_value = False self.config = OpenMetadataWorkflowConfig.parse_obj(mock_dbt_config) self.dbt_source_obj = DbtSource.create( mock_dbt_config["source"], self.config.workflowConfig.openMetadataServerConfig, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") def test_dbt_manifest_v4_v5_v6(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V4_V5_V6, expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") def test_dbt_manifest_v7(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V7, expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") def test_dbt_manifest_v8(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V8, expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") def test_dbt_manifest_null_db(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.return_value = MOCK_NULL_DB_TABLE self.execute_test( MOCK_SAMPLE_MANIFEST_NULL_DB, expected_records=2, expected_data_models=EXPECTED_DATA_MODEL_NULL_DB, ) def test_dbt_get_corrected_name(self): self.assertEqual( "dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle") ) self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None")) self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null")) self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev")) def test_dbt_get_dbt_tag_labels(self): result = self.dbt_source_obj.get_dbt_tag_labels(["tag1", "tag2.name", "tag3"]) self.assertListEqual(result, MOCK_TAG_LABELS) def test_dbt_get_data_model_path(self): _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V8 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node) self.assertEqual("sample/customers/root/path/models/customers.sql", result) def test_dbt_generate_entity_link(self): _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "test.jaffle_shop.unique_orders_order_id.fed79b3a6e" ) dbt_test = { "manifest_node": manifest_node, "upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"], "results": "", } result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test) self.assertListEqual( [ "<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>" ], result, ) def test_dbt_compiled_query(self): expected_query = "sample customers compile code" # Test the compiled queries with v8 manifest _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V8 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) self.assertEqual(expected_query, result) # Test the compiled queries with v4 v5 v6 manifest _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) self.assertEqual(expected_query, result) def test_dbt_raw_query(self): expected_query = "sample customers raw code" # Test the raw queries with v8 manifest _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V8 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) self.assertEqual(expected_query, result) # Test the raw queries with v4 v5 v6 manifest _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) self.assertEqual(expected_query, result) @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") def test_dbt_owner(self, es_search_from_fqn): es_search_from_fqn.return_value = MOCK_USER _, dbt_objects = self.get_dbt_object_files( mock_manifest=MOCK_SAMPLE_MANIFEST_V8 ) manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) result = self.dbt_source_obj.get_dbt_owner( manifest_node=manifest_node, catalog_node=None ) self.assertEqual("aaron_johnson0", result.fullyQualifiedName) def execute_test(self, mock_manifest, expected_records, expected_data_models): dbt_files, dbt_objects = self.get_dbt_object_files(mock_manifest) self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records) self.check_yield_datamodel( dbt_objects=dbt_objects, expected_data_models=expected_data_models ) def get_dbt_object_files(self, mock_manifest): mock_file_path = Path(__file__).parent / mock_manifest with open(mock_file_path) as file: mock_data: dict = json.load(file) dbt_files = DbtFiles(dbt_manifest=mock_data) dbt_objects = DbtObjects( dbt_catalog=parse_catalog(dbt_files.dbt_catalog) if dbt_files.dbt_catalog else None, dbt_manifest=parse_manifest(dbt_files.dbt_manifest), dbt_run_results=parse_run_results(dbt_files.dbt_run_results) if dbt_files.dbt_run_results else None, ) return dbt_files, dbt_objects def check_dbt_validate(self, dbt_files, expected_records): with self.assertLogs() as captured: self.dbt_source_obj.validate_dbt_files(dbt_files=dbt_files) self.assertEqual(len(captured.records), expected_records) for record in captured.records: self.assertNotIn("Error", record.getMessage()) self.assertNotIn("Unable", record.getMessage()) def check_yield_datamodel(self, dbt_objects, expected_data_models): data_model_list = [] yield_data_models = self.dbt_source_obj.yield_data_models( dbt_objects=dbt_objects ) for data_model_link in yield_data_models: if isinstance(data_model_link, DataModelLink): self.assertIn( data_model_link.table_entity.fullyQualifiedName.__root__, EXPECTED_DATA_MODEL_FQNS, ) data_model_list.append(data_model_link.datamodel) for _, (exptected, original) in enumerate( zip(expected_data_models, data_model_list) ): self.assertEqual(exptected, original)