diff --git a/metadata-ingestion/src/datahub/configuration/pattern_utils.py b/metadata-ingestion/src/datahub/configuration/pattern_utils.py index 313e68c418..51c9d053b9 100644 --- a/metadata-ingestion/src/datahub/configuration/pattern_utils.py +++ b/metadata-ingestion/src/datahub/configuration/pattern_utils.py @@ -1,5 +1,7 @@ from datahub.configuration.common import AllowDenyPattern +UUID_REGEX = r"[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}" + def is_schema_allowed( schema_pattern: AllowDenyPattern, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 2c93436643..ff6c25c324 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -1,10 +1,11 @@ import logging from enum import Enum -from typing import Dict, Optional, cast +from typing import Dict, List, Optional, cast from pydantic import Field, SecretStr, root_validator, validator from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.pattern_utils import UUID_REGEX from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.glossary.classifier import ClassificationConfig @@ -20,6 +21,18 @@ from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsage logger = logging.Logger(__name__) +# FIVETRAN creates temporary tables in schema named FIVETRAN_xxx_STAGING. +# Ref - https://support.fivetran.com/hc/en-us/articles/1500003507122-Why-Is-There-an-Empty-Schema-Named-Fivetran-staging-in-the-Destination- +# +# DBT incremental models create temporary tables ending with __dbt_tmp +# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540 +DEFAULT_UPSTREAMS_DENY_LIST = [ + r".*\.FIVETRAN_.*_STAGING\..*", # fivetran + r".*__DBT_TMP$", # dbt + rf".*\.SEGMENT_{UUID_REGEX}", # segment + rf".*\.STAGING_.*_{UUID_REGEX}", # stitch +] + class TagOption(str, Enum): with_lineage = "with_lineage" @@ -89,6 +102,16 @@ class SnowflakeV2Config( description="Whether to validate upstream snowflake tables against allow-deny patterns", ) + tag_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.", + ) + + upstreams_deny_pattern: List[str] = Field( + default=DEFAULT_UPSTREAMS_DENY_LIST, + description="[Advanced] Regex patterns for upstream tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. Defaults are to set in such a way to ignore the temporary staging tables created by known ETL tools. Not used if `use_legacy_lineage_method=True`", + ) + @validator("include_column_lineage") def validate_include_column_lineage(cls, v, values): if not values.get("include_table_lineage") and v: @@ -97,11 +120,6 @@ class SnowflakeV2Config( ) return v - tag_pattern: AllowDenyPattern = Field( - default=AllowDenyPattern.allow_all(), - description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.", - ) - @root_validator(pre=False) def validate_unsupported_configs(cls, values: Dict) -> Dict: value = values.get("include_read_operational_stats") @@ -140,15 +158,14 @@ class SnowflakeV2Config( and values["stateful_ingestion"].enabled and values["stateful_ingestion"].remove_stale_metadata ) - include_table_lineage = values.get("include_table_lineage") # TODO: Allow lineage extraction and profiling irrespective of basic schema extraction, - # as it seems possible with some refractor + # as it seems possible with some refactor if not include_technical_schema and any( - [include_profiles, delete_detection_enabled, include_table_lineage] + [include_profiles, delete_detection_enabled] ): raise ValueError( - "Can not perform Deletion Detection, Lineage Extraction, Profiling without extracting snowflake technical schema. Set `include_technical_schema` to True or disable Deletion Detection, Lineage Extraction, Profiling." + "Cannot perform Deletion Detection or Profiling without extracting snowflake technical schema. Set `include_technical_schema` to True or disable Deletion Detection and Profiling." ) return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index ce716c6b63..7a5fbf74cd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -340,6 +340,7 @@ class SnowflakeLineageExtractor( if not self.config.ignore_start_time_lineage else 0, end_time_millis=int(self.config.end_time.timestamp() * 1000), + upstreams_deny_pattern=self.config.upstreams_deny_pattern, include_view_lineage=self.config.include_view_lineage, include_column_lineage=self.config.include_column_lineage, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 85b5a077ee..ac28cf73fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -1,6 +1,27 @@ -from typing import Optional +from typing import List, Optional from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain +from datahub.ingestion.source.snowflake.snowflake_config import ( + DEFAULT_UPSTREAMS_DENY_LIST, +) + + +def create_deny_regex_sql_filter( + upstreams_deny_pattern: List[str], filter_cols: List[str] +) -> str: + upstream_sql_filter = ( + " AND ".join( + [ + (f"NOT RLIKE({col_name},'{regexp}','i')") + for col_name in filter_cols + for regexp in upstreams_deny_pattern + ] + ) + if upstreams_deny_pattern + else "" + ) + + return upstream_sql_filter class SnowflakeQuery: @@ -426,20 +447,30 @@ class SnowflakeQuery: ) = 1 """ + # Note on use of `upstreams_deny_pattern` to ignore temporary tables: + # Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and + # OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query. @staticmethod def table_to_table_lineage_history_v2( start_time_millis: int, end_time_millis: int, include_view_lineage: bool = True, include_column_lineage: bool = True, + upstreams_deny_pattern: List[str] = DEFAULT_UPSTREAMS_DENY_LIST, ) -> str: if include_column_lineage: return SnowflakeQuery.table_upstreams_with_column_lineage( - start_time_millis, end_time_millis, include_view_lineage + start_time_millis, + end_time_millis, + upstreams_deny_pattern, + include_view_lineage, ) else: return SnowflakeQuery.table_upstreams_only( - start_time_millis, end_time_millis, include_view_lineage + start_time_millis, + end_time_millis, + upstreams_deny_pattern, + include_view_lineage, ) @staticmethod @@ -664,19 +695,11 @@ class SnowflakeQuery: basic_usage_counts.bucket_start_time """ - # Note on temporary tables: - # Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and - # OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query. - # - # FIVETRAN creates temporary tables in schema named FIVETRAN_xxx_STAGING. - # Ref - https://support.fivetran.com/hc/en-us/articles/1500003507122-Why-Is-There-an-Empty-Schema-Named-Fivetran-staging-in-the-Destination- - # - # DBT incremental models create temporary tables ending with __dbt_tmp - # Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540 @staticmethod def table_upstreams_with_column_lineage( start_time_millis: int, end_time_millis: int, + upstreams_deny_pattern: List[str], include_view_lineage: bool = True, ) -> str: allowed_upstream_table_domains = ( @@ -684,6 +707,12 @@ class SnowflakeQuery: if include_view_lineage else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER ) + + upstream_sql_filter = create_deny_regex_sql_filter( + upstreams_deny_pattern, + ["upstream_table_name", "upstream_column_table_name"], + ) + return f""" WITH column_lineage_history AS ( SELECT @@ -712,10 +741,7 @@ class SnowflakeQuery: AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND upstream_table_domain in {allowed_upstream_table_domains} AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' - AND upstream_column_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%' - AND upstream_column_table_name NOT LIKE '%\\_\\_DBT\\_TMP' - AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%' - AND upstream_table_name NOT LIKE '%\\_\\_DBT\\_TMP' + {("AND " + upstream_sql_filter) if upstream_sql_filter else ""} ), column_upstream_jobs AS ( SELECT @@ -781,6 +807,7 @@ class SnowflakeQuery: def table_upstreams_only( start_time_millis: int, end_time_millis: int, + upstreams_deny_pattern: List[str], include_view_lineage: bool = True, ) -> str: allowed_upstream_table_domains = ( @@ -788,6 +815,11 @@ class SnowflakeQuery: if include_view_lineage else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER ) + + upstream_sql_filter = create_deny_regex_sql_filter( + upstreams_deny_pattern, + ["upstream_table_name"], + ) return f""" WITH table_lineage_history AS ( SELECT @@ -810,8 +842,7 @@ class SnowflakeQuery: AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND upstream_table_domain in {allowed_upstream_table_domains} AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' - AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%' - AND upstream_table_name NOT LIKE '%\\_\\_DBT\\_TMP' + {("AND " + upstream_sql_filter) if upstream_sql_filter else ""} ) SELECT downstream_table_name AS "DOWNSTREAM_TABLE_NAME", diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index c5d68cda2f..ebb229d1c7 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -312,7 +312,10 @@ def default_query_results(query): # noqa: C901 ] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - 1654499820000, 1654586220000, True, True + start_time_millis=1654499820000, + end_time_millis=1654586220000, + include_view_lineage=True, + include_column_lineage=True, ), ): return [ @@ -381,7 +384,10 @@ def default_query_results(query): # noqa: C901 ] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - 1654499820000, 1654586220000, False, False + start_time_millis=1654499820000, + end_time_millis=1654586220000, + include_view_lineage=False, + include_column_lineage=False, ), ): return [ diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index a0e6cace64..ed3bea49f0 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -228,7 +228,10 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( default_query_results, [ snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - 1654499820000, 1654586220000, False, True + start_time_millis=1654499820000, + end_time_millis=1654586220000, + include_view_lineage=False, + include_column_lineage=True, ) ], "Database 'SNOWFLAKE' does not exist or not authorized.", diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_leacy_lineage.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py similarity index 100% rename from metadata-ingestion/tests/integration/snowflake/test_snowflake_leacy_lineage.py rename to metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py diff --git a/metadata-ingestion/tests/unit/test_snowflake_source.py b/metadata-ingestion/tests/unit/test_snowflake_source.py index 05df8d1939..e6a8eee3e2 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_source.py @@ -4,13 +4,20 @@ import pytest from pydantic import ValidationError from datahub.configuration.common import OauthConfiguration +from datahub.configuration.pattern_utils import UUID_REGEX from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.source.snowflake.constants import ( CLIENT_PREFETCH_THREADS, CLIENT_SESSION_KEEP_ALIVE, SnowflakeCloudProvider, ) -from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config +from datahub.ingestion.source.snowflake.snowflake_config import ( + DEFAULT_UPSTREAMS_DENY_LIST, + SnowflakeV2Config, +) +from datahub.ingestion.source.snowflake.snowflake_query import ( + create_deny_regex_sql_filter, +) from datahub.ingestion.source.snowflake.snowflake_usage_v2 import ( SnowflakeObjectAccessEntry, ) @@ -572,3 +579,28 @@ def test_snowflake_object_access_entry_missing_object_id(): "objectName": "SOME.OBJECT.NAME", } ) + + +def test_snowflake_query_create_deny_regex_sql(): + assert create_deny_regex_sql_filter([], ["col"]) == "" + assert ( + create_deny_regex_sql_filter([".*tmp.*"], ["col"]) + == "NOT RLIKE(col,'.*tmp.*','i')" + ) + + assert ( + create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col"]) + == "NOT RLIKE(col,'.*tmp.*','i') AND NOT RLIKE(col,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')" + ) + + assert ( + create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col1", "col2"]) + == "NOT RLIKE(col1,'.*tmp.*','i') AND NOT RLIKE(col1,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(col2,'.*tmp.*','i') AND NOT RLIKE(col2,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')" + ) + + assert ( + create_deny_regex_sql_filter( + DEFAULT_UPSTREAMS_DENY_LIST, ["upstream_table_name"] + ) + == r"NOT RLIKE(upstream_table_name,'.*\.FIVETRAN_.*_STAGING\..*','i') AND NOT RLIKE(upstream_table_name,'.*__DBT_TMP$','i') AND NOT RLIKE(upstream_table_name,'.*\.SEGMENT_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(upstream_table_name,'.*\.STAGING_.*_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')" + )