diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py new file mode 100644 index 00000000000..7b5b1c4054d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -0,0 +1,88 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Constants required for dbt +""" + +from enum import Enum + +# Based on https://schemas.getdbt.com/dbt/manifest/v7/index.html +REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"] + +# Based on https://schemas.getdbt.com/dbt/catalog/v1.json +REQUIRED_CATALOG_KEYS = ["name", "type", "index"] + +NONE_KEYWORDS_LIST = ["none", "null"] + +DBT_CATALOG_FILE_NAME = "catalog.json" +DBT_MANIFEST_FILE_NAME = "manifest.json" +DBT_RUN_RESULTS_FILE_NAME = "run_results.json" + + +class SkipResourceTypeEnum(Enum): + """ + Enum for nodes to be skipped + """ + + ANALYSIS = "analysis" + TEST = "test" + + +class CompiledQueriesEnum(Enum): + """ + Enum for Compiled Queries + """ + + COMPILED_CODE = "compiled_code" + COMPILED_SQL = "compiled_sql" + + +class RawQueriesEnum(Enum): + """ + Enum for Raw Queries + """ + + RAW_CODE = "raw_code" + RAW_SQL = "raw_sql" + + +class DbtTestSuccessEnum(Enum): + """ + Enum for success messages of dbt tests + """ + + SUCCESS = "success" + PASS = "pass" + + +class DbtTestFailureEnum(Enum): + """ + Enum for failure message of dbt tests + """ + + FAILURE = "failure" + FAIL = "fail" + + +class DbtCommonEnum(Enum): + """ + Common enum for dbt + """ + + OWNER = "owner" + NODES = "nodes" + SOURCES = "sources" + RESOURCETYPE = "resource_type" + MANIFEST_NODE = "manifest_node" + UPSTREAM = "upstream" + RESULTS = "results" + TEST_SUITE_NAME = "test_suite_name" + DBT_TEST_SUITE = "DBT_TEST_SUITE" diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py new file mode 100644 index 00000000000..f5f7bf407e1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py @@ -0,0 +1,180 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +DBT utils methods. +""" +import traceback +from typing import Optional, Union + +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.dbt.constants import ( + NONE_KEYWORDS_LIST, + CompiledQueriesEnum, + DbtCommonEnum, + RawQueriesEnum, +) +from metadata.utils import entity_link +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +def create_test_case_parameter_definitions(dbt_test): + """ + Method to create test case parameter definitions + """ + try: + if hasattr(dbt_test, "test_metadata"): + test_case_param_definition = [ + { + "name": dbt_test.test_metadata.name, + "displayName": dbt_test.test_metadata.name, + "required": False, + } + ] + return test_case_param_definition + except Exception as err: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.error( + f"Failed to capture tests case parameter definitions for node: {dbt_test} {err}" + ) + return None + + +def create_test_case_parameter_values(dbt_test): + """ + Method to create test case parameter values + """ + try: + manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) + if hasattr(manifest_node, "test_metadata"): + values = manifest_node.test_metadata.kwargs.get("values") + dbt_test_values = "" + if values: + dbt_test_values = ",".join(str(value) for value in values) + test_case_param_values = [ + {"name": manifest_node.test_metadata.name, "value": dbt_test_values} + ] + return test_case_param_values + except Exception as err: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.error( + f"Failed to capture tests case parameter values for node: {dbt_test} {err}" + ) + return None + + +def generate_entity_link(dbt_test): + """ + Method returns entity link + """ + manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) + entity_link_list = [ + entity_link.get_entity_link( + table_fqn=table_fqn, + column_name=manifest_node.column_name + if hasattr(manifest_node, "column_name") + else None, + ) + for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value] + ] + return entity_link_list + + +def get_dbt_compiled_query(mnode) -> Optional[str]: + """ + Method to get dbt compiled query + """ + if hasattr(mnode, CompiledQueriesEnum.COMPILED_CODE.value) and mnode.compiled_code: + return mnode.compiled_code + if hasattr(mnode, CompiledQueriesEnum.COMPILED_SQL.value) and mnode.compiled_sql: + return mnode.compiled_sql + logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}") + return None + + +def get_dbt_raw_query(mnode) -> Optional[str]: + """ + Method to get dbt raw query + """ + if hasattr(mnode, RawQueriesEnum.RAW_CODE.value) and mnode.raw_code: + return mnode.raw_code + if hasattr(mnode, RawQueriesEnum.RAW_SQL.value) and mnode.raw_sql: + return mnode.raw_sql + logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}") + return None + + +def check_or_create_test_suite( + metadata: OpenMetadata, test_entity_link: str +) -> Union[TestSuite, EntityReference]: + """Check if test suite exists, if not create it + + Args: + entity_link (str): entity link + + Returns: + TestSuite: + """ + table_fqn = entity_link.get_table_fqn(test_entity_link) + return metadata.get_or_create_executable_test_suite(table_fqn) + + +def check_ephemeral_node(manifest_node) -> bool: + """ + Check if the manifest node is an ephemeral node + """ + if ( + hasattr(manifest_node, "config") + and manifest_node.config + and hasattr(manifest_node.config, "materialized") + and manifest_node.config.materialized == "ephemeral" + ): + return True + return False + + +def get_dbt_model_name(manifest_node) -> str: + """ + Get the alias or name of the manifest node + """ + return ( + manifest_node.alias + if hasattr(manifest_node, "alias") and manifest_node.alias + else manifest_node.name + ) + + +def get_corrected_name(name: Optional[str]): + """ + Method to fetch correct name + """ + correct_name = None + if name: + correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name + return correct_name + + +def get_data_model_path(manifest_node): + """ + Method to get data model path + """ + datamodel_path = None + if manifest_node.original_file_path: + if hasattr(manifest_node, "root_path") and manifest_node.root_path: + datamodel_path = ( + f"{manifest_node.root_path}/{manifest_node.original_file_path}" + ) + else: + datamodel_path = manifest_node.original_file_path + return datamodel_path diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 31509d6b003..09e0db2e33b 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -13,7 +13,6 @@ DBT source methods. """ import traceback from datetime import datetime -from enum import Enum from typing import Iterable, List, Optional, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -50,7 +49,6 @@ from metadata.generated.schema.tests.testDefinition import ( TestDefinition, TestPlatform, ) -from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference @@ -60,87 +58,38 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.ingestion.source.database.dbt.constants import ( + REQUIRED_CATALOG_KEYS, + REQUIRED_MANIFEST_KEYS, + DbtCommonEnum, + DbtTestFailureEnum, + DbtTestSuccessEnum, + SkipResourceTypeEnum, +) from metadata.ingestion.source.database.dbt.dbt_service import ( DbtFiles, DbtObjects, DbtServiceSource, ) -from metadata.utils import entity_link, fqn +from metadata.ingestion.source.database.dbt.dbt_utils import ( + check_ephemeral_node, + check_or_create_test_suite, + create_test_case_parameter_definitions, + create_test_case_parameter_values, + generate_entity_link, + get_corrected_name, + get_data_model_path, + get_dbt_compiled_query, + get_dbt_model_name, + get_dbt_raw_query, +) +from metadata.utils import fqn from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.logger import ingestion_logger from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() -# Based on https://schemas.getdbt.com/dbt/manifest/v7/index.html -REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"] - -# Based on https://schemas.getdbt.com/dbt/catalog/v1.json -REQUIRED_CATALOG_KEYS = ["name", "type", "index"] - -NONE_KEYWORDS_LIST = ["none", "null"] - - -class SkipResourceTypeEnum(Enum): - """ - Enum for nodes to be skipped - """ - - ANALYSIS = "analysis" - TEST = "test" - - -class CompiledQueriesEnum(Enum): - """ - Enum for Compiled Queries - """ - - COMPILED_CODE = "compiled_code" - COMPILED_SQL = "compiled_sql" - - -class RawQueriesEnum(Enum): - """ - Enum for Raw Queries - """ - - RAW_CODE = "raw_code" - RAW_SQL = "raw_sql" - - -class DbtTestSuccessEnum(Enum): - """ - Enum for success messages of dbt tests - """ - - SUCCESS = "success" - PASS = "pass" - - -class DbtTestFailureEnum(Enum): - """ - Enum for failure message of dbt tests - """ - - FAILURE = "failure" - FAIL = "fail" - - -class DbtCommonEnum(Enum): - """ - Common enum for dbt - """ - - OWNER = "owner" - NODES = "nodes" - SOURCES = "sources" - RESOURCETYPE = "resource_type" - MANIFEST_NODE = "manifest_node" - UPSTREAM = "upstream" - RESULTS = "results" - TEST_SUITE_NAME = "test_suite_name" - DBT_TEST_SUITE = "DBT_TEST_SUITE" - class InvalidServiceException(Exception): """ @@ -148,7 +97,7 @@ class InvalidServiceException(Exception): """ -class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods +class DbtSource(DbtServiceSource): """ Class defines method to extract metadata from DBT """ @@ -397,6 +346,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ) continue + # Skip the ephemeral nodes since it is not materialized + if check_ephemeral_node(manifest_node): + logger.debug(f"Skipping ephemeral DBT node: {key}.") + continue + # Skip the analysis and test nodes if manifest_node.resource_type.value in [ item.value for item in SkipResourceTypeEnum @@ -404,11 +358,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods logger.debug(f"Skipping DBT node: {key}.") continue - model_name = ( - manifest_node.alias - if hasattr(manifest_node, "alias") and manifest_node.alias - else manifest_node.name - ) + model_name = get_dbt_model_name(manifest_node) logger.debug(f"Processing DBT node: {model_name}") catalog_node = None @@ -424,8 +374,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods include_tags=self.source_config.includeTags, ) - dbt_compiled_query = self.get_dbt_compiled_query(manifest_node) - dbt_raw_query = self.get_dbt_raw_query(manifest_node) + dbt_compiled_query = get_dbt_compiled_query(manifest_node) + dbt_raw_query = get_dbt_raw_query(manifest_node) # Get the table entity from ES # TODO: Change to get_by_name once the postgres case sensitive calls is fixed @@ -433,8 +383,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods self.metadata, entity_type=Table, service_name=self.config.serviceName, - database_name=self.get_corrected_name(manifest_node.database), - schema_name=self.get_corrected_name(manifest_node.schema_), + database_name=get_corrected_name(manifest_node.database), + schema_name=get_corrected_name(manifest_node.schema_), table_name=model_name, ) table_entity: Optional[ @@ -454,9 +404,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods description=manifest_node.description if manifest_node.description else None, - path=self.get_data_model_path( - manifest_node=manifest_node - ), + path=get_data_model_path(manifest_node=manifest_node), rawSql=dbt_raw_query if dbt_raw_query else "", sql=dbt_compiled_query if dbt_compiled_query else "", columns=self.parse_data_model_columns( @@ -486,23 +434,6 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods f"Unexpected exception parsing DBT node:{model_name} - {exc}" ) - def get_corrected_name(self, name: Optional[str]): - correct_name = None - if name: - correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name - return correct_name - - def get_data_model_path(self, manifest_node): - datamodel_path = None - if manifest_node.original_file_path: - if hasattr(manifest_node, "root_path") and manifest_node.root_path: - datamodel_path = ( - f"{manifest_node.root_path}/{manifest_node.original_file_path}" - ) - else: - datamodel_path = manifest_node.original_file_path - return datamodel_path - def parse_upstream_nodes(self, manifest_entities, dbt_node): """ Method to fetch the upstream nodes @@ -517,32 +448,35 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods for node in dbt_node.depends_on.nodes: try: parent_node = manifest_entities[node] - table_name = ( - parent_node.alias - if hasattr(parent_node, "alias") and parent_node.alias - else parent_node.name - ) - parent_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.config.serviceName, - database_name=self.get_corrected_name(parent_node.database), - schema_name=self.get_corrected_name(parent_node.schema_), - table_name=table_name, - ) - # check if the parent table exists in OM before adding it to the upstream list - # TODO: Change to get_by_name once the postgres case sensitive calls is fixed - parent_table_entity: Optional[ - Union[Table, List[Table]] - ] = get_entity_from_es_result( - entity_list=self.metadata.es_search_from_fqn( - entity_type=Table, fqn_search_string=parent_fqn - ), - fetch_multiple_entities=False, - ) - if parent_table_entity: - upstream_nodes.append(parent_fqn) + # check if the node is an ephemeral node + # Recursively store the upstream of the ephemeral node in the upstream list + if check_ephemeral_node(parent_node): + upstream_nodes.extend( + self.parse_upstream_nodes(manifest_entities, parent_node) + ) + else: + parent_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + database_name=get_corrected_name(parent_node.database), + schema_name=get_corrected_name(parent_node.schema_), + table_name=get_dbt_model_name(parent_node), + ) + + # check if the parent table exists in OM before adding it to the upstream list + # TODO: Change to get_by_name once the postgres case sensitive calls is fixed + parent_table_entity: Optional[ + Union[Table, List[Table]] + ] = get_entity_from_es_result( + entity_list=self.metadata.es_search_from_fqn( + entity_type=Table, fqn_search_string=parent_fqn + ), + fetch_multiple_entities=False, + ) + if parent_table_entity: + upstream_nodes.append(parent_fqn) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning( @@ -763,7 +697,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods description=manifest_node.description, entityType=entity_type, testPlatforms=[TestPlatform.DBT], - parameterDefinition=self.create_test_case_parameter_definitions( + parameterDefinition=create_test_case_parameter_definitions( manifest_node ), displayName=None, @@ -781,9 +715,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) if manifest_node: logger.debug(f"Processing DBT Test Case for node: {manifest_node.name}") - entity_link_list = self.generate_entity_link(dbt_test) + entity_link_list = generate_entity_link(dbt_test) for entity_link_str in entity_link_list: - test_suite = self._check_or_create_test_suite(entity_link_str) + test_suite = check_or_create_test_suite( + self.metadata, entity_link_str + ) yield CreateTestCaseRequest( name=manifest_node.name, description=manifest_node.description, @@ -792,9 +728,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ), entityLink=entity_link_str, testSuite=test_suite.fullyQualifiedName, - parameterValues=self.create_test_case_parameter_values( - dbt_test - ), + parameterValues=create_test_case_parameter_values(dbt_test), displayName=None, owner=None, ) @@ -890,94 +824,5 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods f"Failed to capture tests results for node: {manifest_node.name} {err}" ) - def create_test_case_parameter_definitions(self, dbt_test): - try: - if hasattr(dbt_test, "test_metadata"): - test_case_param_definition = [ - { - "name": dbt_test.test_metadata.name, - "displayName": dbt_test.test_metadata.name, - "required": False, - } - ] - return test_case_param_definition - except Exception as err: # pylint: disable=broad-except - logger.debug(traceback.format_exc()) - logger.error( - f"Failed to capture tests case parameter definitions for node: {dbt_test} {err}" - ) - return None - - def create_test_case_parameter_values(self, dbt_test): - try: - manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) - if hasattr(manifest_node, "test_metadata"): - values = manifest_node.test_metadata.kwargs.get("values") - dbt_test_values = "" - if values: - dbt_test_values = ",".join(str(value) for value in values) - test_case_param_values = [ - {"name": manifest_node.test_metadata.name, "value": dbt_test_values} - ] - return test_case_param_values - except Exception as err: # pylint: disable=broad-except - logger.debug(traceback.format_exc()) - logger.error( - f"Failed to capture tests case parameter values for node: {dbt_test} {err}" - ) - return None - - def generate_entity_link(self, dbt_test): - """ - Method returns entity link - """ - manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) - entity_link_list = [ - entity_link.get_entity_link( - table_fqn=table_fqn, - column_name=manifest_node.column_name - if hasattr(manifest_node, "column_name") - else None, - ) - for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value] - ] - return entity_link_list - - def get_dbt_compiled_query(self, mnode) -> Optional[str]: - if ( - hasattr(mnode, CompiledQueriesEnum.COMPILED_CODE.value) - and mnode.compiled_code - ): - return mnode.compiled_code - if ( - hasattr(mnode, CompiledQueriesEnum.COMPILED_SQL.value) - and mnode.compiled_sql - ): - return mnode.compiled_sql - logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}") - return None - - def get_dbt_raw_query(self, mnode) -> Optional[str]: - if hasattr(mnode, RawQueriesEnum.RAW_CODE.value) and mnode.raw_code: - return mnode.raw_code - if hasattr(mnode, RawQueriesEnum.RAW_SQL.value) and mnode.raw_sql: - return mnode.raw_sql - logger.debug(f"Unable to get DBT compiled query for node - {mnode.name}") - return None - - def _check_or_create_test_suite( - self, test_entity_link: str - ) -> Union[TestSuite, EntityReference]: - """Check if test suite exists, if not create it - - Args: - entity_link (str): entity link - - Returns: - TestSuite: - """ - table_fqn = entity_link.get_table_fqn(test_entity_link) - return self.metadata.get_or_create_executable_test_suite(table_fqn) - def close(self): self.metadata.close() diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index 08ad4640e45..d85721bf85b 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -24,6 +24,13 @@ from metadata.generated.schema.type.tagLabel import ( TagSource, ) from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.ingestion.source.database.dbt.dbt_utils import ( + generate_entity_link, + get_corrected_name, + get_data_model_path, + get_dbt_compiled_query, + get_dbt_raw_query, +) from metadata.ingestion.source.database.dbt.metadata import DbtSource from metadata.utils.dbt_config import DbtFiles, DbtObjects from metadata.utils.tag_utils import get_tag_labels @@ -332,12 +339,10 @@ class DbtUnitTest(TestCase): ) def test_dbt_get_corrected_name(self): - self.assertEqual( - "dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle") - ) - self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None")) - self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null")) - self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev")) + self.assertEqual("dbt_jaffle", get_corrected_name(name="dbt_jaffle")) + self.assertIsNone(get_corrected_name(name="None")) + self.assertIsNone(get_corrected_name(name="null")) + self.assertIsNotNone(get_corrected_name(name="dev")) @patch("metadata.utils.tag_utils.get_tag_label") def test_dbt_get_dbt_tag_labels(self, get_tag_label): @@ -378,7 +383,7 @@ class DbtUnitTest(TestCase): manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) - result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node) + result = get_data_model_path(manifest_node=manifest_node) self.assertEqual("sample/customers/root/path/models/customers.sql", result) def test_dbt_generate_entity_link(self): @@ -393,7 +398,7 @@ class DbtUnitTest(TestCase): "upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"], "results": "", } - result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test) + result = generate_entity_link(dbt_test=dbt_test) self.assertListEqual( [ "<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>" @@ -411,7 +416,7 @@ class DbtUnitTest(TestCase): manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) - result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) + result = get_dbt_compiled_query(mnode=manifest_node) self.assertEqual(expected_query, result) # Test the compiled queries with v4 v5 v6 manifest @@ -421,7 +426,7 @@ class DbtUnitTest(TestCase): manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) - result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) + result = get_dbt_compiled_query(mnode=manifest_node) self.assertEqual(expected_query, result) def test_dbt_raw_query(self): @@ -434,7 +439,7 @@ class DbtUnitTest(TestCase): manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) - result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) + result = get_dbt_raw_query(mnode=manifest_node) self.assertEqual(expected_query, result) # Test the raw queries with v4 v5 v6 manifest @@ -444,7 +449,7 @@ class DbtUnitTest(TestCase): manifest_node = dbt_objects.dbt_manifest.nodes.get( "model.jaffle_shop.customers" ) - result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) + result = get_dbt_raw_query(mnode=manifest_node) self.assertEqual(expected_query, result) @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")