fix(ingest/snowflake): Apply email filter on all usage metrics (#9269)

This commit is contained in:
Tamas Nemeth 2023-11-23 09:45:25 +01:00 committed by GitHub
parent 37ea292aa6
commit f794a90515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 0 deletions

View File

@ -1,5 +1,6 @@
from typing import List, Optional from typing import List, Optional
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST
@ -551,6 +552,8 @@ class SnowflakeQuery:
use_base_objects: bool, use_base_objects: bool,
top_n_queries: int, top_n_queries: int,
include_top_n_queries: bool, include_top_n_queries: bool,
email_domain: Optional[str],
email_filter: AllowDenyPattern,
) -> str: ) -> str:
if not include_top_n_queries: if not include_top_n_queries:
top_n_queries = 0 top_n_queries = 0
@ -561,6 +564,9 @@ class SnowflakeQuery:
objects_column = ( objects_column = (
"BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED" "BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED"
) )
email_filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
email_domain = f"@{email_domain}" if email_domain else ""
return f""" return f"""
WITH object_access_history AS WITH object_access_history AS
@ -578,12 +584,16 @@ class SnowflakeQuery:
query_id, query_id,
query_start_time, query_start_time,
user_name, user_name,
NVL(USERS.email, CONCAT(user_name, '{email_domain}')) AS user_email,
{objects_column} {objects_column}
from from
snowflake.account_usage.access_history snowflake.account_usage.access_history
LEFT JOIN
snowflake.account_usage.users USERS
WHERE WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3) query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
{email_filter_query}
) )
t, t,
lateral flatten(input => t.{objects_column}) object lateral flatten(input => t.{objects_column}) object
@ -705,6 +715,34 @@ class SnowflakeQuery:
basic_usage_counts.bucket_start_time basic_usage_counts.bucket_start_time
""" """
@staticmethod
def gen_email_filter_query(email_filter: AllowDenyPattern) -> str:
allow_filters = []
allow_filter = ""
if len(email_filter.allow) == 1 and email_filter.allow[0] == ".*":
allow_filter = ""
else:
for allow_pattern in email_filter.allow:
allow_filters.append(
f"rlike(user_name, '{allow_pattern}','{'i' if email_filter.ignoreCase else 'c'}')"
)
if allow_filters:
allow_filter = " OR ".join(allow_filters)
allow_filter = f"AND ({allow_filter})"
deny_filters = []
deny_filter = ""
for deny_pattern in email_filter.deny:
deny_filters.append(
f"rlike(user_name, '{deny_pattern}','{'i' if email_filter.ignoreCase else 'c'}')"
)
if deny_filters:
deny_filter = " OR ".join(deny_filters)
deny_filter = f"({deny_filter})"
email_filter_query = allow_filter + (
" AND" + f" NOT {deny_filter}" if deny_filter else ""
)
return email_filter_query
@staticmethod @staticmethod
def table_upstreams_with_column_lineage( def table_upstreams_with_column_lineage(
start_time_millis: int, start_time_millis: int,

View File

@ -214,6 +214,8 @@ class SnowflakeUsageExtractor(
use_base_objects=self.config.apply_view_usage_to_tables, use_base_objects=self.config.apply_view_usage_to_tables,
top_n_queries=self.config.top_n_queries, top_n_queries=self.config.top_n_queries,
include_top_n_queries=self.config.include_top_n_queries, include_top_n_queries=self.config.include_top_n_queries,
email_domain=self.config.email_domain,
email_filter=self.config.user_email_pattern,
), ),
) )
except Exception as e: except Exception as e:

View File

@ -1,6 +1,7 @@
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake import snowflake_query from datahub.ingestion.source.snowflake import snowflake_query
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
@ -263,6 +264,8 @@ def default_query_results( # noqa: C901
top_n_queries=10, top_n_queries=10,
include_top_n_queries=True, include_top_n_queries=True,
time_bucket_size=BucketDuration.DAY, time_bucket_size=BucketDuration.DAY,
email_domain=None,
email_filter=AllowDenyPattern.allow_all(),
) )
): ):
return [] return []

View File

@ -3,6 +3,7 @@ from unittest.mock import MagicMock, patch
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.oauth import OAuthConfiguration from datahub.configuration.oauth import OAuthConfiguration
from datahub.configuration.pattern_utils import UUID_REGEX from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.api.source import SourceCapability
@ -16,6 +17,7 @@ from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config, SnowflakeV2Config,
) )
from datahub.ingestion.source.snowflake.snowflake_query import ( from datahub.ingestion.source.snowflake.snowflake_query import (
SnowflakeQuery,
create_deny_regex_sql_filter, create_deny_regex_sql_filter,
) )
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import ( from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
@ -661,3 +663,44 @@ def test_snowflake_temporary_patterns_config_rename():
} }
) )
assert conf.temporary_tables_pattern == [".*tmp.*"] assert conf.temporary_tables_pattern == [".*tmp.*"]
def test_email_filter_query_generation_with_one_deny():
email_filter = AllowDenyPattern(deny=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == " AND NOT (rlike(user_name, '.*@example.com','i'))"
def test_email_filter_query_generation_without_any_filter():
email_filter = AllowDenyPattern()
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == ""
def test_email_filter_query_generation_one_allow():
email_filter = AllowDenyPattern(allow=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == "AND (rlike(user_name, '.*@example.com','i'))"
def test_email_filter_query_generation_one_allow_and_deny():
email_filter = AllowDenyPattern(
allow=[".*@example.com", ".*@example2.com"],
deny=[".*@example2.com", ".*@example4.com"],
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','i') OR rlike(user_name, '.*@example2.com','i')) AND NOT (rlike(user_name, '.*@example2.com','i') OR rlike(user_name, '.*@example4.com','i'))"
)
def test_email_filter_query_generation_with_case_insensitive_filter():
email_filter = AllowDenyPattern(
allow=[".*@example.com"], deny=[".*@example2.com"], ignoreCase=False
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','c')) AND NOT (rlike(user_name, '.*@example2.com','c'))"
)