feat(dbt): add filtering for materialized nodes based on their physical location (#14689)

Co-authored-by: Abdullah Tariq <abdullah.tariq@adevinta.com>
Co-authored-by: skrydal <piotr.skrydalewicz@gmail.com>
This commit is contained in:
Abdullah 2025-09-17 15:18:16 +02:00 committed by GitHub
parent 002cc398d0
commit acffdce986
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 7230 additions and 5 deletions

View File

@ -246,6 +246,23 @@ class DBTEntitiesEnabled(ConfigModel):
return self.model_performance == EmitDirective.YES
class MaterializedNodePatternConfig(ConfigModel):
"""Configuration for filtering materialized nodes based on their physical location"""
database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for database names to filter materialized nodes.",
)
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schema names in format '{database}.{schema}' to filter materialized nodes.",
)
table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for table/view names in format '{database}.{schema}.{table}' to filter materialized nodes.",
)
class DBTCommonConfig(
StatefulIngestionConfigBase,
PlatformInstanceConfigMixin,
@ -294,6 +311,11 @@ class DBTCommonConfig(
default=AllowDenyPattern.allow_all(),
description="regex patterns for dbt model names to filter in ingestion.",
)
materialized_node_pattern: MaterializedNodePatternConfig = Field(
default=MaterializedNodePatternConfig(),
description="Advanced filtering for materialized nodes based on their physical database location. "
"Provides fine-grained control over database.schema.table patterns for catalog consistency.",
)
meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
@ -1018,15 +1040,53 @@ class DBTSourceBase(StatefulIngestionSourceBase):
all_nodes_map,
)
def _is_allowed_node(self, key: str) -> bool:
return self.config.node_name_pattern.allowed(key)
def _is_allowed_node(self, node: DBTNode) -> bool:
"""
Check whether a node should be processed, using multi-layer rules. Checks for materialized nodes might need to be restricted in the future to some cases
"""
if not self.config.node_name_pattern.allowed(node.dbt_name):
return False
if not self._is_allowed_materialized_node(node):
return False
return True
def _is_allowed_materialized_node(self, node: DBTNode) -> bool:
"""Filter nodes based on their materialized database location for catalog consistency"""
# Database level filtering
if not node.database:
return True
if not self.config.materialized_node_pattern.database_pattern.allowed(
node.database
):
return False
# Schema level filtering: {database}.{schema}
if not node.schema:
return True
if not self.config.materialized_node_pattern.schema_pattern.allowed(
node._join_parts([node.database, node.schema])
):
return False
# Table level filtering: {database}.{schema}.{table}
if not node.name:
return True
if not self.config.materialized_node_pattern.table_pattern.allowed(
node.get_db_fqn()
):
return False
return True
def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes: List[DBTNode] = []
for node in all_nodes:
key = node.dbt_name
if not self._is_allowed_node(key):
if not self._is_allowed_node(node):
self.report.nodes_filtered.append(key)
continue
@ -1118,8 +1178,8 @@ class DBTSourceBase(StatefulIngestionSourceBase):
cll_nodes.add(dbt_name)
schema_nodes.add(dbt_name)
for dbt_name in all_nodes_map:
if self._is_allowed_node(dbt_name):
for dbt_name, dbt_node in all_nodes_map.items():
if self._is_allowed_node(dbt_node):
add_node_to_cll_list(dbt_name)
return schema_nodes, cll_nodes

View File

@ -0,0 +1,347 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Model"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-source-table-pattern",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:dbt"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-source-table-pattern",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"business_owner": "jdoe.last@gmail.com",
"data_governance.team_owner": "Finance",
"has_pii": "True",
"int_property": "1",
"double_property": "2.5",
"node_type": "model",
"materialization": "ephemeral",
"dbt_file_path": "models/transform/customer_details.sql",
"language": "sql",
"dbt_unique_id": "model.sample_dbt.customer_details",
"dbt_package_name": "sample_dbt",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v1.json",
"manifest_version": "0.19.1",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "0.19.1"
},
"name": "customer_details",
"description": "",
"tags": [
"dbt:test_tag"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:alice2",
"type": "DATAOWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:dbt:test_tag"
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "model.sample_dbt.customer_details",
"platform": "urn:li:dataPlatform:dbt",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "customer_id",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INT",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "full_name",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "email",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "address",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "city",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "postal_code",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "phone",
"nullable": false,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
}
]
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD),customer_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),customer_id)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD),first_name)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD),last_name)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),full_name)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD),email)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),email)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD),address)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),address)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD),city)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),city)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD),postal_code)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),postal_code)"
],
"confidenceScore": 0.9
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD),phone)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD),phone)"
],
"confidenceScore": 0.9
}
]
}
},
{
"com.linkedin.pegasus2avro.dataset.ViewProperties": {
"materialized": false,
"viewLogic": "{{ config(\n materialized = \"ephemeral\",\n) }}\n\nSELECT\n c.customer_id,\n c.first_name || ' ' || c.last_name as \"full_name\",\n c.email,\n a.address,\n m.city,\n a.postal_code,\n a.phone\nFROM\n {{ source('pagila', 'customer')}} c\n left outer join {{ source('pagila', 'address')}} a on c.address_id = a.address_id\n left outer join {{ source('pagila', 'city') }} m on a.city_id = m.city_id",
"formattedViewLogic": "SELECT\n c.customer_id,\n c.first_name || ' ' || c.last_name AS \"full_name\",\n c.email,\n a.address,\n m.city,\n a.postal_code,\n a.phone\nFROM \"pagila\".\"public\".\"customer\" AS c\nLEFT OUTER JOIN \"pagila\".\"public\".\"address\" AS a\n ON c.address_id = a.address_id\nLEFT OUTER JOIN \"pagila\".\"public\".\"city\" AS m\n ON a.city_id = m.city_id",
"viewLanguage": "SQL"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-source-table-pattern",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:test_tag",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "dbt:test_tag"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-source-table-pattern",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -276,6 +276,52 @@ class DbtTestConfig:
"enable_meta_mapping": False,
},
),
DbtTestConfig(
"dbt-test-with-source-schema-pattern",
"dbt_test_with_source_schema_pattern_mces.json",
"dbt_test_with_source_schema_pattern_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"materialized_node_pattern": {
"schema_pattern": {"allow": ["pagila\\.dbt_postgres"]}
}
},
),
DbtTestConfig(
"dbt-test-with-source-database-pattern",
"dbt_test_with_source_database_pattern_mces.json",
"dbt_test_with_source_database_pattern_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"materialized_node_pattern": {"database_pattern": {"allow": ["pagila"]}}
},
),
DbtTestConfig(
"dbt-test-with-source-combined-patterns",
"dbt_test_with_source_combined_patterns_mces.json",
"dbt_test_with_source_combined_patterns_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"node_name_pattern": {
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"materialized_node_pattern": {
"database_pattern": {"allow": ["pagila"]},
"schema_pattern": {"allow": ["pagila\\.dbt_postgres"]},
},
},
),
DbtTestConfig(
"dbt-test-with-source-table-pattern",
"dbt_test_with_source_table_pattern_mces.json",
"dbt_test_with_source_table_pattern_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"materialized_node_pattern": {
"table_pattern": {"allow": ["pagila\\.dbt_postgres\\.customer.*"]}
}
},
),
],
ids=lambda dbt_test_config: dbt_test_config.run_id,
)