feat(ingest/snowflake): add config option to specify deny patterns for upstreams (#7962)

Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Mayuri Nehate 2023-05-09 02:43:57 +05:30 committed by GitHub
parent 13b1d66170
commit c845c75a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 124 additions and 32 deletions

View File

@ -1,5 +1,7 @@
from datahub.configuration.common import AllowDenyPattern from datahub.configuration.common import AllowDenyPattern
UUID_REGEX = r"[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}"
def is_schema_allowed( def is_schema_allowed(
schema_pattern: AllowDenyPattern, schema_pattern: AllowDenyPattern,

View File

@ -1,10 +1,11 @@
import logging import logging
from enum import Enum from enum import Enum
from typing import Dict, Optional, cast from typing import Dict, List, Optional, cast
from pydantic import Field, SecretStr, root_validator, validator from pydantic import Field, SecretStr, root_validator, validator
from datahub.configuration.common import AllowDenyPattern from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.glossary.classifier import ClassificationConfig from datahub.ingestion.glossary.classifier import ClassificationConfig
@ -20,6 +21,18 @@ from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsage
logger = logging.Logger(__name__) logger = logging.Logger(__name__)
# FIVETRAN creates temporary tables in schema named FIVETRAN_xxx_STAGING.
# Ref - https://support.fivetran.com/hc/en-us/articles/1500003507122-Why-Is-There-an-Empty-Schema-Named-Fivetran-staging-in-the-Destination-
#
# DBT incremental models create temporary tables ending with __dbt_tmp
# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540
DEFAULT_UPSTREAMS_DENY_LIST = [
r".*\.FIVETRAN_.*_STAGING\..*", # fivetran
r".*__DBT_TMP$", # dbt
rf".*\.SEGMENT_{UUID_REGEX}", # segment
rf".*\.STAGING_.*_{UUID_REGEX}", # stitch
]
class TagOption(str, Enum): class TagOption(str, Enum):
with_lineage = "with_lineage" with_lineage = "with_lineage"
@ -89,6 +102,16 @@ class SnowflakeV2Config(
description="Whether to validate upstream snowflake tables against allow-deny patterns", description="Whether to validate upstream snowflake tables against allow-deny patterns",
) )
tag_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)
upstreams_deny_pattern: List[str] = Field(
default=DEFAULT_UPSTREAMS_DENY_LIST,
description="[Advanced] Regex patterns for upstream tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. Defaults are to set in such a way to ignore the temporary staging tables created by known ETL tools. Not used if `use_legacy_lineage_method=True`",
)
@validator("include_column_lineage") @validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values): def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v: if not values.get("include_table_lineage") and v:
@ -97,11 +120,6 @@ class SnowflakeV2Config(
) )
return v return v
tag_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)
@root_validator(pre=False) @root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict: def validate_unsupported_configs(cls, values: Dict) -> Dict:
value = values.get("include_read_operational_stats") value = values.get("include_read_operational_stats")
@ -140,15 +158,14 @@ class SnowflakeV2Config(
and values["stateful_ingestion"].enabled and values["stateful_ingestion"].enabled
and values["stateful_ingestion"].remove_stale_metadata and values["stateful_ingestion"].remove_stale_metadata
) )
include_table_lineage = values.get("include_table_lineage")
# TODO: Allow lineage extraction and profiling irrespective of basic schema extraction, # TODO: Allow lineage extraction and profiling irrespective of basic schema extraction,
# as it seems possible with some refractor # as it seems possible with some refactor
if not include_technical_schema and any( if not include_technical_schema and any(
[include_profiles, delete_detection_enabled, include_table_lineage] [include_profiles, delete_detection_enabled]
): ):
raise ValueError( raise ValueError(
"Can not perform Deletion Detection, Lineage Extraction, Profiling without extracting snowflake technical schema. Set `include_technical_schema` to True or disable Deletion Detection, Lineage Extraction, Profiling." "Cannot perform Deletion Detection or Profiling without extracting snowflake technical schema. Set `include_technical_schema` to True or disable Deletion Detection and Profiling."
) )
return values return values

View File

@ -340,6 +340,7 @@ class SnowflakeLineageExtractor(
if not self.config.ignore_start_time_lineage if not self.config.ignore_start_time_lineage
else 0, else 0,
end_time_millis=int(self.config.end_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000),
upstreams_deny_pattern=self.config.upstreams_deny_pattern,
include_view_lineage=self.config.include_view_lineage, include_view_lineage=self.config.include_view_lineage,
include_column_lineage=self.config.include_column_lineage, include_column_lineage=self.config.include_column_lineage,
) )

View File

@ -1,6 +1,27 @@
from typing import Optional from typing import List, Optional
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import (
DEFAULT_UPSTREAMS_DENY_LIST,
)
def create_deny_regex_sql_filter(
upstreams_deny_pattern: List[str], filter_cols: List[str]
) -> str:
upstream_sql_filter = (
" AND ".join(
[
(f"NOT RLIKE({col_name},'{regexp}','i')")
for col_name in filter_cols
for regexp in upstreams_deny_pattern
]
)
if upstreams_deny_pattern
else ""
)
return upstream_sql_filter
class SnowflakeQuery: class SnowflakeQuery:
@ -426,20 +447,30 @@ class SnowflakeQuery:
) = 1 ) = 1
""" """
# Note on use of `upstreams_deny_pattern` to ignore temporary tables:
# Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and
# OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query.
@staticmethod @staticmethod
def table_to_table_lineage_history_v2( def table_to_table_lineage_history_v2(
start_time_millis: int, start_time_millis: int,
end_time_millis: int, end_time_millis: int,
include_view_lineage: bool = True, include_view_lineage: bool = True,
include_column_lineage: bool = True, include_column_lineage: bool = True,
upstreams_deny_pattern: List[str] = DEFAULT_UPSTREAMS_DENY_LIST,
) -> str: ) -> str:
if include_column_lineage: if include_column_lineage:
return SnowflakeQuery.table_upstreams_with_column_lineage( return SnowflakeQuery.table_upstreams_with_column_lineage(
start_time_millis, end_time_millis, include_view_lineage start_time_millis,
end_time_millis,
upstreams_deny_pattern,
include_view_lineage,
) )
else: else:
return SnowflakeQuery.table_upstreams_only( return SnowflakeQuery.table_upstreams_only(
start_time_millis, end_time_millis, include_view_lineage start_time_millis,
end_time_millis,
upstreams_deny_pattern,
include_view_lineage,
) )
@staticmethod @staticmethod
@ -664,19 +695,11 @@ class SnowflakeQuery:
basic_usage_counts.bucket_start_time basic_usage_counts.bucket_start_time
""" """
# Note on temporary tables:
# Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and
# OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query.
#
# FIVETRAN creates temporary tables in schema named FIVETRAN_xxx_STAGING.
# Ref - https://support.fivetran.com/hc/en-us/articles/1500003507122-Why-Is-There-an-Empty-Schema-Named-Fivetran-staging-in-the-Destination-
#
# DBT incremental models create temporary tables ending with __dbt_tmp
# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540
@staticmethod @staticmethod
def table_upstreams_with_column_lineage( def table_upstreams_with_column_lineage(
start_time_millis: int, start_time_millis: int,
end_time_millis: int, end_time_millis: int,
upstreams_deny_pattern: List[str],
include_view_lineage: bool = True, include_view_lineage: bool = True,
) -> str: ) -> str:
allowed_upstream_table_domains = ( allowed_upstream_table_domains = (
@ -684,6 +707,12 @@ class SnowflakeQuery:
if include_view_lineage if include_view_lineage
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
) )
upstream_sql_filter = create_deny_regex_sql_filter(
upstreams_deny_pattern,
["upstream_table_name", "upstream_column_table_name"],
)
return f""" return f"""
WITH column_lineage_history AS ( WITH column_lineage_history AS (
SELECT SELECT
@ -712,10 +741,7 @@ class SnowflakeQuery:
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains} AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND upstream_column_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%' {("AND " + upstream_sql_filter) if upstream_sql_filter else ""}
AND upstream_column_table_name NOT LIKE '%\\_\\_DBT\\_TMP'
AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%'
AND upstream_table_name NOT LIKE '%\\_\\_DBT\\_TMP'
), ),
column_upstream_jobs AS ( column_upstream_jobs AS (
SELECT SELECT
@ -781,6 +807,7 @@ class SnowflakeQuery:
def table_upstreams_only( def table_upstreams_only(
start_time_millis: int, start_time_millis: int,
end_time_millis: int, end_time_millis: int,
upstreams_deny_pattern: List[str],
include_view_lineage: bool = True, include_view_lineage: bool = True,
) -> str: ) -> str:
allowed_upstream_table_domains = ( allowed_upstream_table_domains = (
@ -788,6 +815,11 @@ class SnowflakeQuery:
if include_view_lineage if include_view_lineage
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
) )
upstream_sql_filter = create_deny_regex_sql_filter(
upstreams_deny_pattern,
["upstream_table_name"],
)
return f""" return f"""
WITH table_lineage_history AS ( WITH table_lineage_history AS (
SELECT SELECT
@ -810,8 +842,7 @@ class SnowflakeQuery:
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains} AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%' {("AND " + upstream_sql_filter) if upstream_sql_filter else ""}
AND upstream_table_name NOT LIKE '%\\_\\_DBT\\_TMP'
) )
SELECT SELECT
downstream_table_name AS "DOWNSTREAM_TABLE_NAME", downstream_table_name AS "DOWNSTREAM_TABLE_NAME",

View File

@ -312,7 +312,10 @@ def default_query_results(query): # noqa: C901
] ]
elif query in ( elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
1654499820000, 1654586220000, True, True start_time_millis=1654499820000,
end_time_millis=1654586220000,
include_view_lineage=True,
include_column_lineage=True,
), ),
): ):
return [ return [
@ -381,7 +384,10 @@ def default_query_results(query): # noqa: C901
] ]
elif query in ( elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
1654499820000, 1654586220000, False, False start_time_millis=1654499820000,
end_time_millis=1654586220000,
include_view_lineage=False,
include_column_lineage=False,
), ),
): ):
return [ return [

View File

@ -228,7 +228,10 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
default_query_results, default_query_results,
[ [
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
1654499820000, 1654586220000, False, True start_time_millis=1654499820000,
end_time_millis=1654586220000,
include_view_lineage=False,
include_column_lineage=True,
) )
], ],
"Database 'SNOWFLAKE' does not exist or not authorized.", "Database 'SNOWFLAKE' does not exist or not authorized.",

View File

@ -4,13 +4,20 @@ import pytest
from pydantic import ValidationError from pydantic import ValidationError
from datahub.configuration.common import OauthConfiguration from datahub.configuration.common import OauthConfiguration
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.source.snowflake.constants import ( from datahub.ingestion.source.snowflake.constants import (
CLIENT_PREFETCH_THREADS, CLIENT_PREFETCH_THREADS,
CLIENT_SESSION_KEEP_ALIVE, CLIENT_SESSION_KEEP_ALIVE,
SnowflakeCloudProvider, SnowflakeCloudProvider,
) )
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config from datahub.ingestion.source.snowflake.snowflake_config import (
DEFAULT_UPSTREAMS_DENY_LIST,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_query import (
create_deny_regex_sql_filter,
)
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import ( from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
SnowflakeObjectAccessEntry, SnowflakeObjectAccessEntry,
) )
@ -572,3 +579,28 @@ def test_snowflake_object_access_entry_missing_object_id():
"objectName": "SOME.OBJECT.NAME", "objectName": "SOME.OBJECT.NAME",
} }
) )
def test_snowflake_query_create_deny_regex_sql():
assert create_deny_regex_sql_filter([], ["col"]) == ""
assert (
create_deny_regex_sql_filter([".*tmp.*"], ["col"])
== "NOT RLIKE(col,'.*tmp.*','i')"
)
assert (
create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col"])
== "NOT RLIKE(col,'.*tmp.*','i') AND NOT RLIKE(col,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')"
)
assert (
create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col1", "col2"])
== "NOT RLIKE(col1,'.*tmp.*','i') AND NOT RLIKE(col1,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(col2,'.*tmp.*','i') AND NOT RLIKE(col2,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')"
)
assert (
create_deny_regex_sql_filter(
DEFAULT_UPSTREAMS_DENY_LIST, ["upstream_table_name"]
)
== r"NOT RLIKE(upstream_table_name,'.*\.FIVETRAN_.*_STAGING\..*','i') AND NOT RLIKE(upstream_table_name,'.*__DBT_TMP$','i') AND NOT RLIKE(upstream_table_name,'.*\.SEGMENT_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(upstream_table_name,'.*\.STAGING_.*_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')"
)