Fix #23325: Deduplicate dbt tags (#24327)

This commit is contained in:
Mayur Singal 2025-11-13 22:02:12 +05:30 committed by GitHub
parent dec8a0e9bc
commit 96224bf9c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 136 additions and 0 deletions

View File

@ -346,6 +346,9 @@ class DbtSource(DbtServiceSource):
)
)
try:
# Deduplicate tags before building FQNs
dbt_tags_list = list(set(dbt_tags_list)) if dbt_tags_list else []
# Create all the tags added
dbt_tag_labels = [
fqn.build(

View File

@ -1491,3 +1491,136 @@ class DbtUnitTest(TestCase):
schema_name_used = call[1]["schema_name"]
break
self.assertEqual(schema_name_used, "actual_schema")
@patch("metadata.utils.tag_utils.get_ometa_tag_and_classification")
@patch("metadata.utils.fqn.build")
def test_yield_dbt_tags_deduplication(
self, mock_fqn_build, mock_get_ometa_tag_and_classification
):
"""Test that duplicate tags are deduplicated before FQN building"""
mock_fqn_build.side_effect = lambda _, __, classification_name, tag_name: (
f"{classification_name}.{tag_name}"
)
mock_get_ometa_tag_and_classification.return_value = []
mock_node_1 = MagicMock()
mock_node_1.resource_type = "model"
mock_node_1.tags = ["tag1", "tag2", "tag3"]
mock_node_1.columns = {}
mock_node_2 = MagicMock()
mock_node_2.resource_type = "model"
mock_node_2.tags = ["tag2", "tag3", "tag4"]
mock_node_2.columns = {}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest.nodes = {
"model.test.table1": mock_node_1,
"model.test.table2": mock_node_2,
}
mock_dbt_objects.dbt_manifest.sources = {}
list(self.dbt_source_obj.yield_dbt_tags(mock_dbt_objects))
call_args = mock_fqn_build.call_args_list
tag_names_used = [call[1]["tag_name"] for call in call_args]
self.assertEqual(len(tag_names_used), 4)
self.assertEqual(set(tag_names_used), {"tag1", "tag2", "tag3", "tag4"})
@patch("metadata.utils.tag_utils.get_ometa_tag_and_classification")
@patch("metadata.utils.fqn.build")
def test_yield_dbt_tags_column_deduplication(
self, mock_fqn_build, mock_get_ometa_tag_and_classification
):
"""Test that duplicate tags from columns are deduplicated"""
mock_fqn_build.side_effect = lambda _, __, classification_name, tag_name: (
f"{classification_name}.{tag_name}"
)
mock_get_ometa_tag_and_classification.return_value = []
mock_column_1 = MagicMock()
mock_column_1.tags = ["col_tag1", "col_tag2"]
mock_column_2 = MagicMock()
mock_column_2.tags = ["col_tag2", "col_tag3"]
mock_node = MagicMock()
mock_node.resource_type = "model"
mock_node.tags = ["model_tag", "col_tag1"]
mock_node.columns = {"column1": mock_column_1, "column2": mock_column_2}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest.nodes = {"model.test.table1": mock_node}
mock_dbt_objects.dbt_manifest.sources = {}
list(self.dbt_source_obj.yield_dbt_tags(mock_dbt_objects))
call_args = mock_fqn_build.call_args_list
tag_names_used = [call[1]["tag_name"] for call in call_args]
self.assertEqual(len(tag_names_used), 4)
self.assertEqual(
set(tag_names_used), {"model_tag", "col_tag1", "col_tag2", "col_tag3"}
)
@patch("metadata.utils.tag_utils.get_ometa_tag_and_classification")
@patch("metadata.utils.fqn.build")
def test_yield_dbt_tags_empty_list(
self, mock_fqn_build, mock_get_ometa_tag_and_classification
):
"""Test that empty tag list is handled correctly"""
mock_get_ometa_tag_and_classification.return_value = []
mock_node = MagicMock()
mock_node.resource_type = "model"
mock_node.tags = []
mock_node.columns = {}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest.nodes = {"model.test.table1": mock_node}
mock_dbt_objects.dbt_manifest.sources = {}
list(self.dbt_source_obj.yield_dbt_tags(mock_dbt_objects))
mock_fqn_build.assert_not_called()
@patch("metadata.utils.tag_utils.get_ometa_tag_and_classification")
@patch("metadata.utils.fqn.build")
def test_yield_dbt_tags_skip_resource_types(
self, mock_fqn_build, mock_get_ometa_tag_and_classification
):
"""Test that skipped resource types are not processed"""
from metadata.ingestion.source.database.dbt.constants import (
SkipResourceTypeEnum,
)
mock_fqn_build.side_effect = lambda _, __, classification_name, tag_name: (
f"{classification_name}.{tag_name}"
)
mock_get_ometa_tag_and_classification.return_value = []
mock_node_skip = MagicMock()
mock_node_skip.resource_type = SkipResourceTypeEnum.TEST.value
mock_node_skip.tags = ["skip_tag"]
mock_node_skip.columns = {}
mock_node_process = MagicMock()
mock_node_process.resource_type = "model"
mock_node_process.tags = ["process_tag"]
mock_node_process.columns = {}
mock_dbt_objects = MagicMock()
mock_dbt_objects.dbt_manifest.nodes = {
"test.test.skip": mock_node_skip,
"model.test.process": mock_node_process,
}
mock_dbt_objects.dbt_manifest.sources = {}
list(self.dbt_source_obj.yield_dbt_tags(mock_dbt_objects))
call_args = mock_fqn_build.call_args_list
tag_names_used = [call[1]["tag_name"] for call in call_args]
self.assertEqual(len(tag_names_used), 1)
self.assertEqual(tag_names_used[0], "process_tag")