Add: dbt tags Filter (#21628)

This commit is contained in:
Suman Maharana 2025-06-07 12:25:23 +05:30 committed by GitHub
parent 2c657d6034
commit fd88a6d449
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 41 additions and 2 deletions

View File

@ -97,6 +97,7 @@ from metadata.ingestion.source.database.dbt.models import DbtMeta
from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.entity_link import get_table_fqn
from metadata.utils.filters import filter_by_tag
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
from metadata.utils.time_utils import datetime_to_timestamp
@ -245,6 +246,18 @@ class DbtSource(DbtServiceSource):
f"Unable to find the node or columns in the catalog file for dbt node: {key}"
)
def filter_tags(self, tags: List[str]) -> List[str]:
"""
Filter tags based on tag filter pattern if configured
"""
if self.source_config.tagFilterPattern:
return [
tag
for tag in tags
if not filter_by_tag(self.source_config.tagFilterPattern, tag)
]
return tags
def yield_dbt_tags(
self, dbt_objects: DbtObjects
) -> Iterable[Either[OMetaTagAndClassification]]:
@ -272,13 +285,13 @@ class DbtSource(DbtServiceSource):
# Add the tags from the model
model_tags = manifest_node.tags
if model_tags:
dbt_tags_list.extend(model_tags)
dbt_tags_list.extend(self.filter_tags(model_tags))
# Add the tags from the columns
for _, column in manifest_node.columns.items():
column_tags = column.tags
if column_tags:
dbt_tags_list.extend(column_tags)
dbt_tags_list.extend(self.filter_tags(column_tags))
except Exception as exc:
yield Either(
left=StackTraceError(
@ -509,6 +522,7 @@ class DbtSource(DbtServiceSource):
dbt_table_tags_list = []
if manifest_node.tags:
manifest_node.tags = self.filter_tags(manifest_node.tags)
dbt_table_tags_list = (
get_tag_labels(
metadata=self.metadata,
@ -669,6 +683,7 @@ class DbtSource(DbtServiceSource):
column_description = catalog_column.comment
dbt_column_tag_list = []
manifest_column.tags = self.filter_tags(manifest_column.tags)
dbt_column_tag_list.extend(
get_tag_labels(
metadata=self.metadata,

View File

@ -310,3 +310,16 @@ def filter_by_collection(
:return: True for filtering, False otherwise
"""
return _filter(collection_pattern, collection_name)
def filter_by_tag(tag_pattern: Optional[FilterPattern], tag_name: str) -> bool:
"""
Return True if the models needs to be filtered, False otherwise
Include takes precedence over exclude
:param tag_pattern: Model defining tag filtering logic
:param tag_name: tag name
:return: True for filtering, False otherwise
"""
return _filter(tag_pattern, tag_name)

View File

@ -90,6 +90,11 @@
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
"title": "Database Filter Pattern"
},
"tagFilterPattern": {
"description": "Regex to only fetch tags that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
"title": "Tag Filter Pattern"
}
},
"additionalProperties": false,

View File

@ -54,6 +54,10 @@ export interface DbtPipeline {
* Regex exclude tables or databases that matches the pattern.
*/
tableFilterPattern?: FilterPattern;
/**
* Regex to only fetch tags that matches the pattern.
*/
tagFilterPattern?: FilterPattern;
/**
* Pipeline type
*/
@ -68,6 +72,8 @@ export interface DbtPipeline {
* Regex to only fetch tables or databases that matches the pattern.
*
* Regex exclude tables or databases that matches the pattern.
*
* Regex to only fetch tags that matches the pattern.
*/
export interface FilterPattern {
/**