fix(snowflake): correct additional_database_names OR logic in access_history filter (#14104)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Sergio Gómez Villamor 2025-07-16 12:10:12 +02:00 committed by GitHub
parent bcb261f66c
commit 6c82d70642
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 67 additions and 29 deletions

View File

@ -787,6 +787,11 @@ class QueryLogQueryBuilder:
would be incorrectly filtered out because they don't populate the DML arrays, causing missing lineage
and operational metadata.
Filtering Logic:
A query is included if it matches:
- Any database name in additional_database_names (exact match), OR
- Any database pattern in database_pattern.allow AND NOT any pattern in database_pattern.deny
Args:
database_pattern: The AllowDenyPattern configuration for database filtering
additional_database_names: Additional database names to always include (no pattern matching)
@ -797,14 +802,31 @@ class QueryLogQueryBuilder:
if not database_pattern and not additional_database_names:
return "TRUE"
# Build the database filter conditions for pattern matching
# Build the database filter conditions
# Logic: Allow if (matches additional_database_names_allowlist) OR (matches database_pattern.allow AND NOT matches database_pattern.deny)
# Note: Using UPPER() + RLIKE for case-insensitive matching is more performant than REGEXP_LIKE with 'i' flag
database_filter_parts = []
# Build additional database names condition (exact matches) - these always get included
additional_db_condition = None
if additional_database_names:
additional_db_conditions = []
for db_name in additional_database_names:
# Escape single quotes
escaped_db_name = db_name.replace("'", "''")
additional_db_conditions.append(
f"SPLIT_PART(UPPER(o:objectName), '.', 1) = '{escaped_db_name.upper()}'"
)
if additional_db_conditions:
additional_db_condition = " OR ".join(additional_db_conditions)
# Build database pattern condition (allow AND NOT deny)
database_pattern_condition = None
if database_pattern:
allow_patterns = database_pattern.allow
deny_patterns = database_pattern.deny
pattern_parts = []
# Add allow patterns (if not the default "allow all")
if allow_patterns and allow_patterns != [".*"]:
allow_conditions = []
@ -815,7 +837,11 @@ class QueryLogQueryBuilder:
f"SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE '{escaped_pattern}'"
)
if allow_conditions:
database_filter_parts.append(f"({' OR '.join(allow_conditions)})")
pattern_parts.append(
allow_conditions[0]
if len(allow_conditions) == 1
else f"({' OR '.join(allow_conditions)})"
)
# Add deny patterns
if deny_patterns:
@ -827,24 +853,36 @@ class QueryLogQueryBuilder:
f"SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '{escaped_pattern}'"
)
if deny_conditions:
database_filter_parts.append(f"({' AND '.join(deny_conditions)})")
pattern_parts.append(
deny_conditions[0]
if len(deny_conditions) == 1
else f"({' AND '.join(deny_conditions)})"
)
# Add additional database names (exact matches)
if additional_database_names:
additional_db_conditions = []
for db_name in additional_database_names:
# Escape single quotes
escaped_db_name = db_name.replace("'", "''")
additional_db_conditions.append(
f"SPLIT_PART(UPPER(o:objectName), '.', 1) = '{escaped_db_name.upper()}'"
)
if additional_db_conditions:
database_filter_parts.append(
f"({' OR '.join(additional_db_conditions)})"
)
if pattern_parts:
database_pattern_condition = " AND ".join(pattern_parts)
if database_filter_parts:
database_filter_condition = " AND ".join(database_filter_parts)
# Combine conditions: additional_database_names OR database_pattern
filter_conditions = []
if additional_db_condition:
filter_conditions.append(
f"({additional_db_condition})"
if len(additional_db_condition.split(" OR ")) > 1
else additional_db_condition
)
if database_pattern_condition:
filter_conditions.append(
f"({database_pattern_condition})"
if len(database_pattern_condition.split(" AND ")) > 1
else database_pattern_condition
)
if filter_conditions:
database_filter_condition = (
filter_conditions[0]
if len(filter_conditions) == 1
else " OR ".join(filter_conditions)
)
# Build a condition that checks if any objects in the arrays match the database pattern
# This implements "at least one" matching behavior: queries are allowed if they touch

View File

@ -35,19 +35,19 @@ class TestBuildAccessHistoryDatabaseFilterCondition:
pytest.param(
AllowDenyPattern(allow=["PROD_.*"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*'))",
id="allow_pattern_only",
),
pytest.param(
AllowDenyPattern(deny=[".*_TEMP"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP'))",
id="deny_pattern_only",
),
pytest.param(
AllowDenyPattern(allow=["PROD_.*"], deny=[".*_TEMP"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*' AND SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP')))",
id="allow_and_deny_patterns",
),
pytest.param(
@ -59,7 +59,7 @@ class TestBuildAccessHistoryDatabaseFilterCondition:
pytest.param(
AllowDenyPattern(deny=[".*_TEMP", ".*_STAGING"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_STAGING'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_STAGING'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_STAGING')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> ((SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_STAGING')))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> ((SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_STAGING')))) > 0 OR (((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP' AND SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_STAGING'))))",
id="multiple_deny_patterns",
),
pytest.param(
@ -71,19 +71,19 @@ class TestBuildAccessHistoryDatabaseFilterCondition:
pytest.param(
None,
["SPECIAL_DB"],
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'SPECIAL_DB')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'SPECIAL_DB'))",
id="single_additional_database_name",
),
pytest.param(
AllowDenyPattern(allow=["PROD_.*"]),
["DB1", "DB2"],
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB2'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB2'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*') AND (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'DB2')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB2') OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(o:objectName), '.', 1) = 'DB2') OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*')) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'DB1' OR SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'DB2') OR SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*'))",
id="pattern_with_additional_database_names",
),
pytest.param(
AllowDenyPattern(allow=["PROD_.*", "DEV_.*"], deny=[".*_TEMP"]),
["SPECIAL_DB"],
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'DEV_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'DEV_.*') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP') AND (SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'DEV_.*') AND (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP') AND (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'SPECIAL_DB')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB' OR ((SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'DEV_.*') AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) = 'SPECIAL_DB' OR ((SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'DEV_.*') AND SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) = 'SPECIAL_DB' OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD_.*' OR SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'DEV_.*') AND SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP')))",
id="complex_pattern_with_additional_database_names",
),
pytest.param(
@ -95,19 +95,19 @@ class TestBuildAccessHistoryDatabaseFilterCondition:
pytest.param(
AllowDenyPattern(allow=[".*"], deny=[".*_TEMP"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*_TEMP')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*_TEMP'))",
id="default_allow_pattern_with_deny_pattern",
),
pytest.param(
AllowDenyPattern(allow=["PROD'_.*"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD''_.*'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD''_.*'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD''_.*')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD''_.*')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) RLIKE 'PROD''_.*')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) RLIKE 'PROD''_.*'))",
id="sql_injection_protection_allow_pattern",
),
pytest.param(
AllowDenyPattern(deny=[".*'_TEMP"]),
None,
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*''_TEMP'))) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> (SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*''_TEMP'))) > 0 OR ((SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*''_TEMP')))",
"(ARRAY_SIZE(FILTER(direct_objects_accessed, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*''_TEMP')) > 0 OR ARRAY_SIZE(FILTER(objects_modified, o -> SPLIT_PART(UPPER(o:objectName), '.', 1) NOT RLIKE '.*''_TEMP')) > 0 OR (SPLIT_PART(UPPER(object_modified_by_ddl:objectName), '.', 1) NOT RLIKE '.*''_TEMP'))",
id="sql_injection_protection_deny_pattern",
),
pytest.param(