From 57e4464e64249467f4e1a8b6d6810031f5ef98be Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Tue, 2 Sep 2025 10:22:15 +0530 Subject: [PATCH] Fixes #22204 - Add support for sources key metadata fetch in dbt (#23003) * Added support for sources key metadata fetch in dbt * address comments Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Fixes * fixed tests --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 30bceee58030590811b7ed95e0ad8ec4ae9c279b) --- .../source/database/dbt/constants.py | 1 - .../ingestion/source/database/dbt/metadata.py | 16 +- ingestion/tests/unit/test_dbt.py | 243 ++++++++++++++++++ 3 files changed, 244 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py index f4764b91be1..dc612a76476 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -105,7 +105,6 @@ class SkipResourceTypeEnum(Enum): ANALYSIS = "analysis" TEST = "test" - SOURCE = "source" class CompiledQueriesEnum(Enum): diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index a8b5030683c..29d8182e924 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -543,7 +543,7 @@ class DbtSource(DbtServiceSource): if ( dbt_objects.dbt_sources - and resource_type == SkipResourceTypeEnum.SOURCE.value + and resource_type == DbtCommonEnum.SOURCE.value ): self.add_dbt_sources( key, @@ -708,20 +708,6 @@ class DbtSource(DbtServiceSource): ) continue - if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value: - parent_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.config.serviceName, - database_name=get_corrected_name(dbt_node.database), - schema_name=get_corrected_name(dbt_node.schema_), - table_name=dbt_node.name, - ) - - # check if the parent table exists in OM before adding it to the upstream list - if self._get_table_entity(table_fqn=parent_fqn): - upstream_nodes.append(parent_fqn) - return upstream_nodes def parse_data_model_columns( diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index 7d0941e60d3..026af80edc4 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -1104,3 +1104,246 @@ class DbtUnitTest(TestCase): ] assert len(list(filter(lambda x: x is not None, parsed_exposures))) == 0 + + def test_constants_required_constraint_keys(self): + """Test REQUIRED_CONSTRAINT_KEYS constant""" + from metadata.ingestion.source.database.dbt.constants import ( + REQUIRED_CONSTRAINT_KEYS, + ) + + expected_keys = [ + "type", + "name", + "expression", + "warn_unenforced", + "warn_unsupported", + ] + self.assertEqual(REQUIRED_CONSTRAINT_KEYS, expected_keys) + + def test_constants_required_results_keys(self): + """Test REQUIRED_RESULTS_KEYS constant""" + from metadata.ingestion.source.database.dbt.constants import ( + REQUIRED_RESULTS_KEYS, + ) + + expected_keys = { + "status", + "timing", + "thread_id", + "execution_time", + "message", + "adapter_response", + "unique_id", + } + self.assertEqual(REQUIRED_RESULTS_KEYS, expected_keys) + + def test_constants_required_node_keys(self): + """Test REQUIRED_NODE_KEYS constant""" + from metadata.ingestion.source.database.dbt.constants import REQUIRED_NODE_KEYS + + expected_keys = { + "schema_", + "schema", + "freshness", + "name", + "resource_type", + "path", + "unique_id", + "source_name", + "source_description", + "source_meta", + "loader", + "identifier", + "relation_name", + "fqn", + "alias", + "checksum", + "config", + "column_name", + "test_metadata", + "original_file_path", + "root_path", + "database", + "tags", + "description", + "columns", + "meta", + "owner", + "created_at", + "group", + "sources", + "compiled", + "docs", + "version", + "latest_version", + "package_name", + "depends_on", + "compiled_code", + "compiled_sql", + "raw_code", + "raw_sql", + "language", + } + self.assertEqual(REQUIRED_NODE_KEYS, expected_keys) + + def test_constants_none_keywords_list(self): + """Test NONE_KEYWORDS_LIST constant""" + from metadata.ingestion.source.database.dbt.constants import NONE_KEYWORDS_LIST + + expected_keywords = ["none", "null"] + self.assertEqual(NONE_KEYWORDS_LIST, expected_keywords) + + def test_constants_exposure_type_map(self): + """Test ExposureTypeMap constant""" + from metadata.ingestion.source.database.dbt.constants import ExposureTypeMap + + self.assertIn("dashboard", ExposureTypeMap) + self.assertIn("ml", ExposureTypeMap) + self.assertIn("application", ExposureTypeMap) + + dashboard_mapping = ExposureTypeMap["dashboard"] + self.assertEqual(dashboard_mapping["entity_type"], Dashboard) + self.assertEqual(dashboard_mapping["entity_type_name"], "dashboard") + + ml_mapping = ExposureTypeMap["ml"] + self.assertEqual(ml_mapping["entity_type"], MlModel) + self.assertEqual(ml_mapping["entity_type_name"], "mlmodel") + + app_mapping = ExposureTypeMap["application"] + self.assertEqual(app_mapping["entity_type"], APIEndpoint) + self.assertEqual(app_mapping["entity_type_name"], "apiEndpoint") + + def test_parse_upstream_nodes_source_schema_handling(self): + """Test that source nodes get schema name '*' in upstream parsing""" + from metadata.ingestion.source.database.dbt.constants import DbtCommonEnum + + mock_source_node = MagicMock() + mock_source_node.resource_type = DbtCommonEnum.SOURCE.value + mock_source_node.database = "test_db" + mock_source_node.schema_ = "test_schema" + mock_source_node.name = "test_source" + + mock_model_node = MagicMock() + mock_model_node.depends_on.nodes = ["source.test.test_source"] + + manifest_entities = {"source.test.test_source": mock_source_node} + + with patch.object( + self.dbt_source_obj, + "_get_table_entity", + return_value=MOCK_TABLE_ENTITIES[0], + ): + with patch.object( + self.dbt_source_obj, + "is_filtered", + return_value=MagicMock(is_filtered=False), + ): + with patch( + "metadata.ingestion.source.database.dbt.dbt_utils.get_dbt_model_name", + return_value="test_source", + ): + with patch( + "metadata.ingestion.source.database.dbt.dbt_utils.get_corrected_name", + side_effect=lambda x: x, + ): + with patch("metadata.utils.fqn.build") as mock_fqn_build: + mock_fqn_build.return_value = ( + "test.*.test_schema.test_source" + ) + + self.dbt_source_obj.parse_upstream_nodes( + manifest_entities, mock_model_node + ) + + # Verify that schema_name="*" was used for source node + calls = mock_fqn_build.call_args_list + schema_name_used = None + for call in calls: + if "schema_name" in call[1]: + schema_name_used = call[1]["schema_name"] + break + self.assertEqual(schema_name_used, "test_schema") + + def test_yield_data_models_processes_sources_key(self): + """Test that yield_data_models processes both nodes and sources keys from manifest""" + mock_manifest = MagicMock() + mock_manifest.sources = {"source.test.table1": MagicMock()} + mock_manifest.nodes = {"model.test.table2": MagicMock()} + mock_manifest.exposures = {"exposure.test.dashboard1": MagicMock()} + + mock_dbt_objects = MagicMock() + mock_dbt_objects.dbt_manifest = mock_manifest + mock_dbt_objects.dbt_catalog = None + mock_dbt_objects.dbt_run_results = None + mock_dbt_objects.dbt_sources = None + + # Verify that manifest_entities includes both sources and nodes + for data_model in self.dbt_source_obj.yield_data_models(mock_dbt_objects): + pass + + # The method should process entities from sources, nodes, and exposures + # We expect the context to be populated with the combined entities + self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "data_model_links")) + self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "exposures")) + self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "dbt_tests")) + + def test_yield_data_models_source_node_schema_handling(self): + """Test that source nodes has correct schema name in yield_data_models""" + from metadata.ingestion.source.database.dbt.constants import DbtCommonEnum + + # Create a mock source node + mock_source_node = MagicMock() + mock_source_node.resource_type = DbtCommonEnum.SOURCE.value + mock_source_node.database = "test_db" + mock_source_node.schema_ = "actual_schema" + mock_source_node.name = "test_source" + mock_source_node.description = "Test source description" + mock_source_node.tags = [] + mock_source_node.meta = {} + + manifest_entities = {"source.test.test_source": mock_source_node} + + mock_dbt_objects = MagicMock() + mock_dbt_objects.dbt_manifest.sources = manifest_entities + mock_dbt_objects.dbt_manifest.nodes = {} + mock_dbt_objects.dbt_manifest.exposures = {} + mock_dbt_objects.dbt_catalog = None + mock_dbt_objects.dbt_run_results = None + mock_dbt_objects.dbt_sources = None + + with patch.object( + self.dbt_source_obj, + "is_filtered", + return_value=MagicMock(is_filtered=False), + ): + with patch.object( + self.dbt_source_obj, + "_get_table_entity", + return_value=MOCK_TABLE_ENTITIES[0], + ): + with patch( + "metadata.ingestion.source.database.dbt.dbt_utils.get_dbt_model_name", + return_value="test_source", + ): + with patch( + "metadata.ingestion.source.database.dbt.dbt_utils.get_corrected_name", + side_effect=lambda x: x, + ): + with patch("metadata.utils.fqn.build") as mock_fqn_build: + mock_fqn_build.return_value = ( + "test_service.test_db.*.test_source" + ) + + # Process the source node + list( + self.dbt_source_obj.yield_data_models(mock_dbt_objects) + ) + + # Verify that schema_name="*" was used for source node + calls = mock_fqn_build.call_args_list + schema_name_used = None + for call in calls: + if "schema_name" in call[1]: + schema_name_used = call[1]["schema_name"] + break + self.assertEqual(schema_name_used, "actual_schema")