Fixes #22204 - Add support for sources key metadata fetch in dbt (#23003)

* Added support for sources key metadata fetch in dbt

* address comments

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fixes

* fixed tests

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Suman Maharana 2025-09-02 10:22:15 +05:30 committed by GitHub
parent cb42409999
commit 30bceee580
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 244 additions and 16 deletions

View File

@ -105,7 +105,6 @@ class SkipResourceTypeEnum(Enum):
ANALYSIS = "analysis"
TEST = "test"
SOURCE = "source"
class CompiledQueriesEnum(Enum):

View File

@ -543,7 +543,7 @@ class DbtSource(DbtServiceSource):
if (
dbt_objects.dbt_sources
and resource_type == SkipResourceTypeEnum.SOURCE.value
and resource_type == DbtCommonEnum.SOURCE.value
):
self.add_dbt_sources(
key,
@ -708,20 +708,6 @@ class DbtSource(DbtServiceSource):
)
continue
if dbt_node.resource_type == SkipResourceTypeEnum.SOURCE.value:
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=get_corrected_name(dbt_node.database),
schema_name=get_corrected_name(dbt_node.schema_),
table_name=dbt_node.name,
)
# check if the parent table exists in OM before adding it to the upstream list
if self._get_table_entity(table_fqn=parent_fqn):
upstream_nodes.append(parent_fqn)
return upstream_nodes
def parse_data_model_columns(

View File

@ -1104,3 +1104,246 @@ class DbtUnitTest(TestCase):
]
assert len(list(filter(lambda x: x is not None, parsed_exposures))) == 0
def test_constants_required_constraint_keys(self):
"""Test REQUIRED_CONSTRAINT_KEYS constant"""
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_CONSTRAINT_KEYS,
)
expected_keys = [
"type",
"name",
"expression",
"warn_unenforced",
"warn_unsupported",
]
self.assertEqual(REQUIRED_CONSTRAINT_KEYS, expected_keys)
def test_constants_required_results_keys(self):
"""Test REQUIRED_RESULTS_KEYS constant"""
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_RESULTS_KEYS,
)
expected_keys = {
"status",
"timing",
"thread_id",
"execution_time",
"message",
"adapter_response",
"unique_id",
}
self.assertEqual(REQUIRED_RESULTS_KEYS, expected_keys)
def test_constants_required_node_keys(self):
"""Test REQUIRED_NODE_KEYS constant"""
from metadata.ingestion.source.database.dbt.constants import REQUIRED_NODE_KEYS
expected_keys = {
"schema_",
"schema",
"freshness",
"name",
"resource_type",
"path",
"unique_id",
"source_name",
"source_description",
"source_meta",
"loader",
"identifier",
"relation_name",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
self.assertEqual(REQUIRED_NODE_KEYS, expected_keys)
def test_constants_none_keywords_list(self):
"""Test NONE_KEYWORDS_LIST constant"""
from metadata.ingestion.source.database.dbt.constants import NONE_KEYWORDS_LIST
expected_keywords = ["none", "null"]
self.assertEqual(NONE_KEYWORDS_LIST, expected_keywords)
def test_constants_exposure_type_map(self):
"""Test ExposureTypeMap constant"""
from metadata.ingestion.source.database.dbt.constants import ExposureTypeMap
self.assertIn("dashboard", ExposureTypeMap)
self.assertIn("ml", ExposureTypeMap)
self.assertIn("application", ExposureTypeMap)
dashboard_mapping = ExposureTypeMap["dashboard"]
self.assertEqual(dashboard_mapping["entity_type"], Dashboard)
self.assertEqual(dashboard_mapping["entity_type_name"], "dashboard")
ml_mapping = ExposureTypeMap["ml"]
self.assertEqual(ml_mapping["entity_type"], MlModel)
self.assertEqual(ml_mapping["entity_type_name"], "mlmodel")
app_mapping = ExposureTypeMap["application"]
self.assertEqual(app_mapping["entity_type"], APIEndpoint)
self.assertEqual(app_mapping["entity_type_name"], "apiEndpoint")
def test_parse_upstream_nodes_source_schema_handling(self):
"""Test that source nodes get schema name '*' in upstream parsing"""
from metadata.ingestion.source.database.dbt.constants import DbtCommonEnum
mock_source_node = MagicMock()
mock_source_node.resource_type = DbtCommonEnum.SOURCE.value
mock_source_node.database = "test_db"
mock_source_node.schema_ = "test_schema"
mock_source_node.name = "test_source"
mock_model_node = MagicMock()
mock_model_node.depends_on.nodes = ["source.test.test_source"]
manifest_entities = {"source.test.test_source": mock_source_node}
with patch.object(
self.dbt_source_obj,
"_get_table_entity",
return_value=MOCK_TABLE_ENTITIES[0],
):
with patch.object(
self.dbt_source_obj,
"is_filtered",
return_value=MagicMock(is_filtered=False),
):
with patch(
"metadata.ingestion.source.database.dbt.dbt_utils.get_dbt_model_name",
return_value="test_source",
):
with patch(
"metadata.ingestion.source.database.dbt.dbt_utils.get_corrected_name",
side_effect=lambda x: x,
):
with patch("metadata.utils.fqn.build") as mock_fqn_build:
mock_fqn_build.return_value = (
"test.*.test_schema.test_source"
)
self.dbt_source_obj.parse_upstream_nodes(
manifest_entities, mock_model_node
)
# Verify that schema_name="*" was used for source node
calls = mock_fqn_build.call_args_list
schema_name_used = None
for call in calls:
if "schema_name" in call[1]:
schema_name_used = call[1]["schema_name"]
break
self.assertEqual(schema_name_used, "test_schema")
def test_yield_data_models_processes_sources_key(self):
"""Test that yield_data_models processes both nodes and sources keys from manifest"""
mock_manifest = MagicMock()
mock_manifest.sources = {"source.test.table1": MagicMock()}
mock_manifest.nodes = {"model.test.table2": MagicMock()}
mock_manifest.exposures = {"exposure.test.dashboard1": MagicMock()}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest = mock_manifest
mock_dbt_objects.dbt_catalog = None
mock_dbt_objects.dbt_run_results = None
mock_dbt_objects.dbt_sources = None
# Verify that manifest_entities includes both sources and nodes
for data_model in self.dbt_source_obj.yield_data_models(mock_dbt_objects):
pass
# The method should process entities from sources, nodes, and exposures
# We expect the context to be populated with the combined entities
self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "data_model_links"))
self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "exposures"))
self.assertTrue(hasattr(self.dbt_source_obj.context.get(), "dbt_tests"))
def test_yield_data_models_source_node_schema_handling(self):
"""Test that source nodes has correct schema name in yield_data_models"""
from metadata.ingestion.source.database.dbt.constants import DbtCommonEnum
# Create a mock source node
mock_source_node = MagicMock()
mock_source_node.resource_type = DbtCommonEnum.SOURCE.value
mock_source_node.database = "test_db"
mock_source_node.schema_ = "actual_schema"
mock_source_node.name = "test_source"
mock_source_node.description = "Test source description"
mock_source_node.tags = []
mock_source_node.meta = {}
manifest_entities = {"source.test.test_source": mock_source_node}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest.sources = manifest_entities
mock_dbt_objects.dbt_manifest.nodes = {}
mock_dbt_objects.dbt_manifest.exposures = {}
mock_dbt_objects.dbt_catalog = None
mock_dbt_objects.dbt_run_results = None
mock_dbt_objects.dbt_sources = None
with patch.object(
self.dbt_source_obj,
"is_filtered",
return_value=MagicMock(is_filtered=False),
):
with patch.object(
self.dbt_source_obj,
"_get_table_entity",
return_value=MOCK_TABLE_ENTITIES[0],
):
with patch(
"metadata.ingestion.source.database.dbt.dbt_utils.get_dbt_model_name",
return_value="test_source",
):
with patch(
"metadata.ingestion.source.database.dbt.dbt_utils.get_corrected_name",
side_effect=lambda x: x,
):
with patch("metadata.utils.fqn.build") as mock_fqn_build:
mock_fqn_build.return_value = (
"test_service.test_db.*.test_source"
)
# Process the source node
list(
self.dbt_source_obj.yield_data_models(mock_dbt_objects)
)
# Verify that schema_name="*" was used for source node
calls = mock_fqn_build.call_args_list
schema_name_used = None
for call in calls:
if "schema_name" in call[1]:
schema_name_used = call[1]["schema_name"]
break
self.assertEqual(schema_name_used, "actual_schema")