2025-03-13 23:26:23 +01:00
import datetime
2023-12-14 23:01:51 +05:30
from typing import Any , Dict
2022-09-12 23:12:52 +05:30
from unittest . mock import MagicMock , patch
import pytest
2022-12-12 19:53:12 +05:30
from pydantic import ValidationError
2022-09-12 23:12:52 +05:30
2024-12-12 05:01:32 -05:00
import datahub . ingestion . source . snowflake . snowflake_utils
2023-11-23 09:45:25 +01:00
from datahub . configuration . common import AllowDenyPattern
2023-05-09 02:43:57 +05:30
from datahub . configuration . pattern_utils import UUID_REGEX
2022-09-12 23:12:52 +05:30
from datahub . ingestion . api . source import SourceCapability
2022-12-28 21:50:37 +05:30
from datahub . ingestion . source . snowflake . constants import (
CLIENT_PREFETCH_THREADS ,
CLIENT_SESSION_KEEP_ALIVE ,
SnowflakeCloudProvider ,
)
2024-11-20 13:34:47 -08:00
from datahub . ingestion . source . snowflake . oauth_config import OAuthConfiguration
2023-05-09 02:43:57 +05:30
from datahub . ingestion . source . snowflake . snowflake_config import (
2024-07-12 15:08:51 -07:00
DEFAULT_TEMP_TABLES_PATTERNS ,
2025-03-13 23:26:23 +01:00
SnowflakeIdentifierConfig ,
2023-05-09 02:43:57 +05:30
SnowflakeV2Config ,
)
2024-12-23 19:11:40 +05:30
from datahub . ingestion . source . snowflake . snowflake_lineage_v2 import UpstreamLineageEdge
2025-03-13 23:26:23 +01:00
from datahub . ingestion . source . snowflake . snowflake_queries import (
SnowflakeQueriesExtractor ,
SnowflakeQueriesExtractorConfig ,
)
2023-05-09 02:43:57 +05:30
from datahub . ingestion . source . snowflake . snowflake_query import (
2023-11-23 09:45:25 +01:00
SnowflakeQuery ,
2023-05-09 02:43:57 +05:30
create_deny_regex_sql_filter ,
)
2023-03-16 04:55:52 -07:00
from datahub . ingestion . source . snowflake . snowflake_usage_v2 import (
SnowflakeObjectAccessEntry ,
)
2025-03-13 23:26:23 +01:00
from datahub . ingestion . source . snowflake . snowflake_utils import (
SnowflakeIdentifierBuilder ,
SnowsightUrlBuilder ,
)
2022-09-12 23:12:52 +05:30
from datahub . ingestion . source . snowflake . snowflake_v2 import SnowflakeV2Source
2025-03-13 23:26:23 +01:00
from datahub . sql_parsing . sql_parsing_aggregator import TableRename , TableSwap
2024-12-12 05:01:32 -05:00
from datahub . testing . doctest import assert_doctest
2025-04-21 17:41:40 -04:00
from tests . integration . snowflake . common import inject_rowcount
2023-12-14 23:01:51 +05:30
from tests . test_helpers import test_connection_helpers
default_oauth_dict : Dict [ str , Any ] = {
" client_id " : " client_id " ,
" client_secret " : " secret " ,
" use_certificate " : False ,
" provider " : " microsoft " ,
" scopes " : [ " datahub_role " ] ,
" authority_url " : " https://dev-abc.okta.com/oauth2/def/v1/token " ,
}
2022-09-12 23:12:52 +05:30
def test_snowflake_source_throws_error_on_account_id_missing ( ) :
2023-12-14 23:01:51 +05:30
with pytest . raises ( ValidationError , match = " account_id \n field required " ) :
2022-09-12 23:12:52 +05:30
SnowflakeV2Config . parse_obj (
{
" username " : " user " ,
" password " : " password " ,
}
)
2023-06-07 04:09:05 -04:00
def test_no_client_id_invalid_oauth_config ( ) :
2023-12-14 23:01:51 +05:30
oauth_dict = default_oauth_dict . copy ( )
del oauth_dict [ " client_id " ]
with pytest . raises ( ValueError , match = " client_id \n field required " ) :
2023-06-07 04:09:05 -04:00
OAuthConfiguration . parse_obj ( oauth_dict )
def test_snowflake_throws_error_on_client_secret_missing_if_use_certificate_is_false ( ) :
2023-12-14 23:01:51 +05:30
oauth_dict = default_oauth_dict . copy ( )
del oauth_dict [ " client_secret " ]
2023-06-07 04:09:05 -04:00
OAuthConfiguration . parse_obj ( oauth_dict )
2023-12-14 23:01:51 +05:30
with pytest . raises (
ValueError ,
match = " ' oauth_config.client_secret ' was none but should be set when using use_certificate false for oauth_config " ,
) :
2022-09-12 23:12:52 +05:30
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
" oauth_config " : oauth_dict ,
}
)
2023-06-07 04:09:05 -04:00
def test_snowflake_throws_error_on_encoded_oauth_private_key_missing_if_use_certificate_is_true ( ) :
2023-12-14 23:01:51 +05:30
oauth_dict = default_oauth_dict . copy ( )
oauth_dict [ " use_certificate " ] = True
2023-06-07 04:09:05 -04:00
OAuthConfiguration . parse_obj ( oauth_dict )
2023-12-14 23:01:51 +05:30
with pytest . raises (
ValueError ,
match = " ' base64_encoded_oauth_private_key ' was none but should be set when using certificate for oauth_config " ,
) :
2022-09-12 23:12:52 +05:30
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
" oauth_config " : oauth_dict ,
}
)
2023-06-07 04:09:05 -04:00
def test_snowflake_oauth_okta_does_not_support_certificate ( ) :
2023-12-14 23:01:51 +05:30
oauth_dict = default_oauth_dict . copy ( )
oauth_dict [ " use_certificate " ] = True
oauth_dict [ " provider " ] = " okta "
2023-06-07 04:09:05 -04:00
OAuthConfiguration . parse_obj ( oauth_dict )
2023-12-14 23:01:51 +05:30
with pytest . raises (
ValueError , match = " Certificate authentication is not supported for Okta. "
) :
2022-09-12 23:12:52 +05:30
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
" oauth_config " : oauth_dict ,
}
)
2023-06-07 04:09:05 -04:00
def test_snowflake_oauth_happy_paths ( ) :
2023-12-14 23:01:51 +05:30
oauth_dict = default_oauth_dict . copy ( )
oauth_dict [ " provider " ] = " okta "
2023-06-07 04:09:05 -04:00
assert SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
2023-12-14 23:01:51 +05:30
" oauth_config " : oauth_dict ,
2023-06-07 04:09:05 -04:00
}
)
2023-12-14 23:01:51 +05:30
oauth_dict [ " use_certificate " ] = True
oauth_dict [ " provider " ] = " microsoft "
oauth_dict [ " encoded_oauth_public_key " ] = " publickey "
oauth_dict [ " encoded_oauth_private_key " ] = " privatekey "
2023-06-07 04:09:05 -04:00
assert SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
2023-12-14 23:01:51 +05:30
" oauth_config " : oauth_dict ,
2023-06-07 04:09:05 -04:00
}
)
2024-12-05 17:50:56 -08:00
def test_snowflake_oauth_token_happy_path ( ) :
assert SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR_TOKEN " ,
" token " : " valid-token " ,
" username " : " test-user " ,
" oauth_config " : None ,
}
)
def test_snowflake_oauth_token_without_token ( ) :
with pytest . raises (
ValidationError , match = " Token required for OAUTH_AUTHENTICATOR_TOKEN. "
) :
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR_TOKEN " ,
" username " : " test-user " ,
}
)
def test_snowflake_oauth_token_with_wrong_auth_type ( ) :
with pytest . raises (
ValueError ,
match = " Token can only be provided when using OAUTH_AUTHENTICATOR_TOKEN. " ,
) :
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR " ,
" token " : " some-token " ,
" username " : " test-user " ,
}
)
def test_snowflake_oauth_token_with_empty_token ( ) :
with pytest . raises (
ValidationError , match = " Token required for OAUTH_AUTHENTICATOR_TOKEN. "
) :
SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" authentication_type " : " OAUTH_AUTHENTICATOR_TOKEN " ,
" token " : " " ,
" username " : " test-user " ,
}
)
2023-12-14 23:01:51 +05:30
default_config_dict : Dict [ str , Any ] = {
" username " : " user " ,
" password " : " password " ,
" account_id " : " https://acctname.snowflakecomputing.com " ,
" warehouse " : " COMPUTE_WH " ,
" role " : " sysadmin " ,
}
2022-09-12 23:12:52 +05:30
def test_account_id_is_added_when_host_port_is_present ( ) :
2023-12-14 23:01:51 +05:30
config_dict = default_config_dict . copy ( )
del config_dict [ " account_id " ]
config_dict [ " host_port " ] = " acctname "
config = SnowflakeV2Config . parse_obj ( config_dict )
2022-09-12 23:12:52 +05:30
assert config . account_id == " acctname "
2022-12-07 19:26:03 +05:30
def test_account_id_with_snowflake_host_suffix ( ) :
2023-12-14 23:01:51 +05:30
config = SnowflakeV2Config . parse_obj ( default_config_dict )
2023-04-11 02:44:42 +05:30
assert config . account_id == " acctname "
2022-12-07 19:26:03 +05:30
2022-09-12 23:12:52 +05:30
def test_snowflake_uri_default_authentication ( ) :
2023-12-14 23:01:51 +05:30
config = SnowflakeV2Config . parse_obj ( default_config_dict )
2023-10-18 11:34:45 -04:00
assert config . get_sql_alchemy_url ( ) == (
" snowflake://user:password@acctname "
" ?application=acryl_datahub "
" &authenticator=SNOWFLAKE "
" &role=sysadmin "
" &warehouse=COMPUTE_WH "
2022-09-12 23:12:52 +05:30
)
def test_snowflake_uri_external_browser_authentication ( ) :
2023-12-14 23:01:51 +05:30
config_dict = default_config_dict . copy ( )
del config_dict [ " password " ]
config_dict [ " authentication_type " ] = " EXTERNAL_BROWSER_AUTHENTICATOR "
config = SnowflakeV2Config . parse_obj ( config_dict )
2023-10-18 11:34:45 -04:00
assert config . get_sql_alchemy_url ( ) == (
" snowflake://user@acctname "
" ?application=acryl_datahub "
" &authenticator=EXTERNALBROWSER "
" &role=sysadmin "
" &warehouse=COMPUTE_WH "
2022-09-12 23:12:52 +05:30
)
def test_snowflake_uri_key_pair_authentication ( ) :
2023-12-14 23:01:51 +05:30
config_dict = default_config_dict . copy ( )
del config_dict [ " password " ]
config_dict [ " authentication_type " ] = " KEY_PAIR_AUTHENTICATOR "
config_dict [ " private_key_path " ] = " /a/random/path "
config_dict [ " private_key_password " ] = " a_random_password "
config = SnowflakeV2Config . parse_obj ( config_dict )
2022-09-12 23:12:52 +05:30
2023-10-18 11:34:45 -04:00
assert config . get_sql_alchemy_url ( ) == (
" snowflake://user@acctname "
" ?application=acryl_datahub "
" &authenticator=SNOWFLAKE_JWT "
" &role=sysadmin "
" &warehouse=COMPUTE_WH "
2022-09-12 23:12:52 +05:30
)
def test_options_contain_connect_args ( ) :
2023-12-14 23:01:51 +05:30
config = SnowflakeV2Config . parse_obj ( default_config_dict )
2022-09-12 23:12:52 +05:30
connect_args = config . get_options ( ) . get ( " connect_args " )
assert connect_args is not None
2022-12-12 19:53:12 +05:30
def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error ( ) :
2023-12-14 23:01:51 +05:30
config_dict = default_config_dict . copy ( )
config_dict [ " include_column_lineage " ] = True
config_dict [ " include_table_lineage " ] = False
with pytest . raises (
ValidationError ,
match = " include_table_lineage must be True for include_column_lineage to be set " ,
) :
SnowflakeV2Config . parse_obj ( config_dict )
2022-12-12 19:53:12 +05:30
2022-12-28 21:50:37 +05:30
def test_snowflake_config_with_no_connect_args_returns_base_connect_args ( ) :
2023-12-14 23:01:51 +05:30
config : SnowflakeV2Config = SnowflakeV2Config . parse_obj ( default_config_dict )
2022-12-28 21:50:37 +05:30
assert config . get_options ( ) [ " connect_args " ] is not None
assert config . get_options ( ) [ " connect_args " ] == {
CLIENT_PREFETCH_THREADS : 10 ,
CLIENT_SESSION_KEEP_ALIVE : True ,
}
2023-04-05 06:16:07 +05:30
def test_private_key_set_but_auth_not_changed ( ) :
2023-12-14 23:01:51 +05:30
with pytest . raises (
ValidationError ,
match = " Either `private_key` and `private_key_path` is set but `authentication_type` is DEFAULT_AUTHENTICATOR. Should be set to ' KEY_PAIR_AUTHENTICATOR ' when using key pair authentication " ,
) :
2023-04-05 06:16:07 +05:30
SnowflakeV2Config . parse_obj (
{
" account_id " : " acctname " ,
" private_key_path " : " /a/random/path " ,
}
)
2022-12-28 21:50:37 +05:30
def test_snowflake_config_with_connect_args_overrides_base_connect_args ( ) :
2023-12-14 23:01:51 +05:30
config_dict = default_config_dict . copy ( )
config_dict [ " connect_args " ] = {
CLIENT_PREFETCH_THREADS : 5 ,
}
config : SnowflakeV2Config = SnowflakeV2Config . parse_obj ( config_dict )
2022-12-28 21:50:37 +05:30
assert config . get_options ( ) [ " connect_args " ] is not None
assert config . get_options ( ) [ " connect_args " ] [ CLIENT_PREFETCH_THREADS ] == 5
assert config . get_options ( ) [ " connect_args " ] [ CLIENT_SESSION_KEEP_ALIVE ] is True
2022-09-12 23:12:52 +05:30
@patch ( " snowflake.connector.connect " )
def test_test_connection_failure ( mock_connect ) :
mock_connect . side_effect = Exception ( " Failed to connect to snowflake " )
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_failure (
report , " Failed to connect to snowflake "
)
2022-09-12 23:12:52 +05:30
@patch ( " snowflake.connector.connect " )
def test_test_connection_basic_success ( mock_connect ) :
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_success ( report )
2022-09-12 23:12:52 +05:30
2024-07-17 11:57:58 -07:00
class MissingQueryMock ( Exception ) :
pass
def setup_mock_connect ( mock_connect , extra_query_results = None ) :
2025-04-21 17:41:40 -04:00
@inject_rowcount
2024-07-17 11:57:58 -07:00
def query_results ( query ) :
if extra_query_results is not None :
try :
return extra_query_results ( query )
except MissingQueryMock :
pass
2022-09-12 23:12:52 +05:30
if query == " select current_role() " :
2024-07-17 11:57:58 -07:00
return [ { " CURRENT_ROLE() " : " TEST_ROLE " } ]
2022-09-12 23:12:52 +05:30
elif query == " select current_secondary_roles() " :
2024-07-17 11:57:58 -07:00
return [ { " CURRENT_SECONDARY_ROLES() " : ' { " roles " : " " , " value " : " " } ' } ]
2022-09-12 23:12:52 +05:30
elif query == " select current_warehouse() " :
2024-07-17 11:57:58 -07:00
return [ { " CURRENT_WAREHOUSE() " : " TEST_WAREHOUSE " } ]
elif query == ' show grants to role " PUBLIC " ' :
return [ ]
raise MissingQueryMock ( f " Unexpected query: { query } " )
2022-09-12 23:12:52 +05:30
connection_mock = MagicMock ( )
cursor_mock = MagicMock ( )
2024-07-17 11:57:58 -07:00
cursor_mock . execute . side_effect = query_results
2022-09-12 23:12:52 +05:30
connection_mock . cursor . return_value = cursor_mock
mock_connect . return_value = connection_mock
@patch ( " snowflake.connector.connect " )
def test_test_connection_no_warehouse ( mock_connect ) :
def query_results ( query ) :
2024-07-17 11:57:58 -07:00
if query == " select current_warehouse() " :
return [ { " CURRENT_WAREHOUSE() " : None } ]
2022-09-12 23:12:52 +05:30
elif query == ' show grants to role " TEST_ROLE " ' :
2024-07-17 11:57:58 -07:00
return [ { " privilege " : " USAGE " , " granted_on " : " DATABASE " , " name " : " DB1 " } ]
raise MissingQueryMock ( f " Unexpected query: { query } " )
2022-09-12 23:12:52 +05:30
setup_mock_connect ( mock_connect , query_results )
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_success ( report )
test_connection_helpers . assert_capability_report (
capability_report = report . capability_report ,
success_capabilities = [ SourceCapability . CONTAINERS ] ,
failure_capabilities = {
SourceCapability . SCHEMA_METADATA : " Current role TEST_ROLE does not have permissions to use warehouse "
} ,
2022-12-28 21:50:37 +05:30
)
2022-09-12 23:12:52 +05:30
@patch ( " snowflake.connector.connect " )
def test_test_connection_capability_schema_failure ( mock_connect ) :
def query_results ( query ) :
2024-07-17 11:57:58 -07:00
if query == ' show grants to role " TEST_ROLE " ' :
return [ { " privilege " : " USAGE " , " granted_on " : " DATABASE " , " name " : " DB1 " } ]
raise MissingQueryMock ( f " Unexpected query: { query } " )
2022-09-12 23:12:52 +05:30
setup_mock_connect ( mock_connect , query_results )
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_success ( report )
test_connection_helpers . assert_capability_report (
capability_report = report . capability_report ,
success_capabilities = [ SourceCapability . CONTAINERS ] ,
failure_capabilities = {
SourceCapability . SCHEMA_METADATA : " Either no tables exist or current role does not have permissions to access them "
} ,
2022-09-12 23:12:52 +05:30
)
@patch ( " snowflake.connector.connect " )
def test_test_connection_capability_schema_success ( mock_connect ) :
def query_results ( query ) :
2024-07-17 11:57:58 -07:00
if query == ' show grants to role " TEST_ROLE " ' :
2022-09-12 23:12:52 +05:30
return [
2024-07-17 11:57:58 -07:00
{ " privilege " : " USAGE " , " granted_on " : " DATABASE " , " name " : " DB1 " } ,
{ " privilege " : " USAGE " , " granted_on " : " SCHEMA " , " name " : " DB1.SCHEMA1 " } ,
{
" privilege " : " REFERENCES " ,
" granted_on " : " TABLE " ,
" name " : " DB1.SCHEMA1.TABLE1 " ,
} ,
2022-09-12 23:12:52 +05:30
]
2024-07-17 11:57:58 -07:00
raise MissingQueryMock ( f " Unexpected query: { query } " )
2022-09-12 23:12:52 +05:30
setup_mock_connect ( mock_connect , query_results )
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_success ( report )
test_connection_helpers . assert_capability_report (
capability_report = report . capability_report ,
success_capabilities = [
SourceCapability . CONTAINERS ,
SourceCapability . SCHEMA_METADATA ,
SourceCapability . DESCRIPTIONS ,
] ,
)
2022-09-12 23:12:52 +05:30
@patch ( " snowflake.connector.connect " )
def test_test_connection_capability_all_success ( mock_connect ) :
def query_results ( query ) :
2024-07-17 11:57:58 -07:00
if query == ' show grants to role " TEST_ROLE " ' :
2022-09-12 23:12:52 +05:30
return [
2024-07-17 11:57:58 -07:00
{ " privilege " : " USAGE " , " granted_on " : " DATABASE " , " name " : " DB1 " } ,
{ " privilege " : " USAGE " , " granted_on " : " SCHEMA " , " name " : " DB1.SCHEMA1 " } ,
{
" privilege " : " SELECT " ,
" granted_on " : " TABLE " ,
" name " : " DB1.SCHEMA1.TABLE1 " ,
} ,
{ " privilege " : " USAGE " , " granted_on " : " ROLE " , " name " : " TEST_USAGE_ROLE " } ,
2022-09-12 23:12:52 +05:30
]
elif query == ' show grants to role " TEST_USAGE_ROLE " ' :
return [
2024-07-17 11:57:58 -07:00
{ " privilege " : " USAGE " , " granted_on " : " DATABASE " , " name " : " SNOWFLAKE " } ,
{ " privilege " : " USAGE " , " granted_on " : " SCHEMA " , " name " : " ACCOUNT_USAGE " } ,
{
" privilege " : " USAGE " ,
" granted_on " : " VIEW " ,
" name " : " SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY " ,
} ,
{
" privilege " : " USAGE " ,
" granted_on " : " VIEW " ,
" name " : " SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY " ,
} ,
{
" privilege " : " USAGE " ,
" granted_on " : " VIEW " ,
" name " : " SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES " ,
} ,
2022-09-12 23:12:52 +05:30
]
2024-07-17 11:57:58 -07:00
raise MissingQueryMock ( f " Unexpected query: { query } " )
2022-09-12 23:12:52 +05:30
setup_mock_connect ( mock_connect , query_results )
2023-12-14 23:01:51 +05:30
report = test_connection_helpers . run_test_connection (
SnowflakeV2Source , default_config_dict
)
test_connection_helpers . assert_basic_connectivity_success ( report )
test_connection_helpers . assert_capability_report (
capability_report = report . capability_report ,
success_capabilities = [
SourceCapability . CONTAINERS ,
SourceCapability . SCHEMA_METADATA ,
SourceCapability . DATA_PROFILING ,
SourceCapability . DESCRIPTIONS ,
SourceCapability . LINEAGE_COARSE ,
] ,
)
2022-12-17 00:30:42 +05:30
def test_aws_cloud_region_from_snowflake_region_id ( ) :
(
cloud ,
cloud_region_id ,
2024-07-12 15:08:51 -07:00
) = SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
" aws_ca_central_1 "
)
2022-12-17 00:30:42 +05:30
assert cloud == SnowflakeCloudProvider . AWS
assert cloud_region_id == " ca-central-1 "
(
cloud ,
cloud_region_id ,
2024-07-12 15:08:51 -07:00
) = SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
" aws_us_east_1_gov "
)
2022-12-17 00:30:42 +05:30
assert cloud == SnowflakeCloudProvider . AWS
assert cloud_region_id == " us-east-1 "
def test_google_cloud_region_from_snowflake_region_id ( ) :
(
cloud ,
cloud_region_id ,
2024-07-12 15:08:51 -07:00
) = SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
" gcp_europe_west2 "
)
2022-12-17 00:30:42 +05:30
assert cloud == SnowflakeCloudProvider . GCP
assert cloud_region_id == " europe-west2 "
def test_azure_cloud_region_from_snowflake_region_id ( ) :
(
cloud ,
cloud_region_id ,
2024-07-12 15:08:51 -07:00
) = SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
2022-12-17 00:30:42 +05:30
" azure_switzerlandnorth "
)
assert cloud == SnowflakeCloudProvider . AZURE
2023-07-12 17:11:33 +05:30
assert cloud_region_id == " switzerland-north "
2022-12-17 00:30:42 +05:30
(
cloud ,
cloud_region_id ,
2024-07-12 15:08:51 -07:00
) = SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
2022-12-17 00:30:42 +05:30
" azure_centralindia "
)
assert cloud == SnowflakeCloudProvider . AZURE
2023-07-12 17:11:33 +05:30
assert cloud_region_id == " central-india "
2022-12-17 00:30:42 +05:30
def test_unknown_cloud_region_from_snowflake_region_id ( ) :
2023-12-14 23:01:51 +05:30
with pytest . raises ( Exception , match = " Unknown snowflake region " ) :
2024-07-12 15:08:51 -07:00
SnowsightUrlBuilder . get_cloud_region_from_snowflake_region_id (
2022-12-17 00:30:42 +05:30
" somecloud_someregion "
)
2023-03-16 04:55:52 -07:00
def test_snowflake_object_access_entry_missing_object_id ( ) :
SnowflakeObjectAccessEntry (
* * {
" columns " : [
{ " columnName " : " A " } ,
{ " columnName " : " B " } ,
] ,
" objectDomain " : " View " ,
" objectName " : " SOME.OBJECT.NAME " ,
}
)
2023-05-09 02:43:57 +05:30
def test_snowflake_query_create_deny_regex_sql ( ) :
assert create_deny_regex_sql_filter ( [ ] , [ " col " ] ) == " "
assert (
create_deny_regex_sql_filter ( [ " .*tmp.* " ] , [ " col " ] )
== " NOT RLIKE(col, ' .*tmp.* ' , ' i ' ) "
)
assert (
create_deny_regex_sql_filter ( [ " .*tmp.* " , UUID_REGEX ] , [ " col " ] )
== " NOT RLIKE(col, ' .*tmp.* ' , ' i ' ) AND NOT RLIKE(col, ' [a-f0-9] {8} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {12} ' , ' i ' ) "
)
assert (
create_deny_regex_sql_filter ( [ " .*tmp.* " , UUID_REGEX ] , [ " col1 " , " col2 " ] )
== " NOT RLIKE(col1, ' .*tmp.* ' , ' i ' ) AND NOT RLIKE(col1, ' [a-f0-9] {8} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {12} ' , ' i ' ) AND NOT RLIKE(col2, ' .*tmp.* ' , ' i ' ) AND NOT RLIKE(col2, ' [a-f0-9] {8} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {12} ' , ' i ' ) "
)
assert (
2024-07-12 15:08:51 -07:00
create_deny_regex_sql_filter (
DEFAULT_TEMP_TABLES_PATTERNS , [ " upstream_table_name " ]
)
== r " NOT RLIKE(upstream_table_name, ' .* \ .FIVETRAN_.*_STAGING \ ..* ' , ' i ' ) AND NOT RLIKE(upstream_table_name, ' .*__DBT_TMP$ ' , ' i ' ) AND NOT RLIKE(upstream_table_name, ' .* \ .SEGMENT_[a-f0-9] {8} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {12} ' , ' i ' ) AND NOT RLIKE(upstream_table_name, ' .* \ .STAGING_.*_[a-f0-9] {8} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {4} [-_][a-f0-9] {12} ' , ' i ' ) AND NOT RLIKE(upstream_table_name, ' .* \ .(GE_TMP_|GE_TEMP_|GX_TEMP_)[0-9A-F] {8} ' , ' i ' ) "
2023-05-09 02:43:57 +05:30
)
2023-06-28 20:29:01 +05:30
def test_snowflake_temporary_patterns_config_rename ( ) :
conf = SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" username " : " user " ,
" password " : " password " ,
" upstreams_deny_pattern " : [ " .*tmp.* " ] ,
}
)
assert conf . temporary_tables_pattern == [ " .*tmp.* " ]
2023-11-23 09:45:25 +01:00
def test_email_filter_query_generation_with_one_deny ( ) :
email_filter = AllowDenyPattern ( deny = [ " .*@example.com " ] )
filter_query = SnowflakeQuery . gen_email_filter_query ( email_filter )
assert filter_query == " AND NOT (rlike(user_name, ' .*@example.com ' , ' i ' )) "
def test_email_filter_query_generation_without_any_filter ( ) :
email_filter = AllowDenyPattern ( )
filter_query = SnowflakeQuery . gen_email_filter_query ( email_filter )
assert filter_query == " "
def test_email_filter_query_generation_one_allow ( ) :
email_filter = AllowDenyPattern ( allow = [ " .*@example.com " ] )
filter_query = SnowflakeQuery . gen_email_filter_query ( email_filter )
assert filter_query == " AND (rlike(user_name, ' .*@example.com ' , ' i ' )) "
def test_email_filter_query_generation_one_allow_and_deny ( ) :
email_filter = AllowDenyPattern (
allow = [ " .*@example.com " , " .*@example2.com " ] ,
deny = [ " .*@example2.com " , " .*@example4.com " ] ,
)
filter_query = SnowflakeQuery . gen_email_filter_query ( email_filter )
assert (
filter_query
== " AND (rlike(user_name, ' .*@example.com ' , ' i ' ) OR rlike(user_name, ' .*@example2.com ' , ' i ' )) AND NOT (rlike(user_name, ' .*@example2.com ' , ' i ' ) OR rlike(user_name, ' .*@example4.com ' , ' i ' )) "
)
def test_email_filter_query_generation_with_case_insensitive_filter ( ) :
email_filter = AllowDenyPattern (
allow = [ " .*@example.com " ] , deny = [ " .*@example2.com " ] , ignoreCase = False
)
filter_query = SnowflakeQuery . gen_email_filter_query ( email_filter )
assert (
filter_query
== " AND (rlike(user_name, ' .*@example.com ' , ' c ' )) AND NOT (rlike(user_name, ' .*@example2.com ' , ' c ' )) "
)
2023-12-18 19:54:31 +01:00
def test_create_snowsight_base_url_us_west ( ) :
2024-07-12 15:08:51 -07:00
result = SnowsightUrlBuilder (
" account_locator " , " aws_us_west_2 " , privatelink = False
) . snowsight_base_url
2023-12-18 19:54:31 +01:00
assert result == " https://app.snowflake.com/us-west-2/account_locator/ "
def test_create_snowsight_base_url_ap_northeast_1 ( ) :
2024-07-12 15:08:51 -07:00
result = SnowsightUrlBuilder (
" account_locator " , " aws_ap_northeast_1 " , privatelink = False
) . snowsight_base_url
2023-12-18 19:54:31 +01:00
assert result == " https://app.snowflake.com/ap-northeast-1.aws/account_locator/ "
2024-12-12 05:01:32 -05:00
def test_snowflake_utils ( ) - > None :
assert_doctest ( datahub . ingestion . source . snowflake . snowflake_utils )
2024-12-23 19:11:40 +05:30
2024-12-24 18:14:51 +05:30
def test_using_removed_fields_causes_no_error ( ) - > None :
assert SnowflakeV2Config . parse_obj (
{
" account_id " : " test " ,
" username " : " snowflake " ,
" password " : " snowflake " ,
" include_view_lineage " : " true " ,
" include_view_column_lineage " : " true " ,
}
)
2024-12-23 19:11:40 +05:30
def test_snowflake_query_result_parsing ( ) :
db_row = {
" DOWNSTREAM_TABLE_NAME " : " db.schema.downstream_table " ,
" DOWNSTREAM_TABLE_DOMAIN " : " Table " ,
" UPSTREAM_TABLES " : [
{
" query_id " : " 01b92f61-0611-c826-000d-0103cf9b5db7 " ,
" upstream_object_domain " : " Table " ,
" upstream_object_name " : " db.schema.upstream_table " ,
}
] ,
" UPSTREAM_COLUMNS " : [ { } ] ,
" QUERIES " : [
{
" query_id " : " 01b92f61-0611-c826-000d-0103cf9b5db7 " ,
" query_text " : " Query test " ,
" start_time " : " 2022-12-01 19:56:34 " ,
}
] ,
}
assert UpstreamLineageEdge . parse_obj ( db_row )
2025-03-13 23:26:23 +01:00
class TestDDLProcessing :
@pytest.fixture
def session_id ( self ) :
return " 14774700483022321 "
@pytest.fixture
def timestamp ( self ) :
return datetime . datetime (
year = 2025 , month = 2 , day = 3 , hour = 15 , minute = 1 , second = 43
) . astimezone ( datetime . timezone . utc )
@pytest.fixture
def extractor ( self ) - > SnowflakeQueriesExtractor :
connection = MagicMock ( )
config = SnowflakeQueriesExtractorConfig ( )
structured_report = MagicMock ( )
filters = MagicMock ( )
structured_report . num_ddl_queries_dropped = 0
identifier_config = SnowflakeIdentifierConfig ( )
identifiers = SnowflakeIdentifierBuilder ( identifier_config , structured_report )
return SnowflakeQueriesExtractor (
connection , config , structured_report , filters , identifiers
)
def test_ddl_processing_alter_table_rename ( self , extractor , session_id , timestamp ) :
query = " ALTER TABLE person_info_loading RENAME TO person_info_final; "
object_modified_by_ddl = {
" objectDomain " : " Table " ,
" objectId " : 1789034 ,
" objectName " : " DUMMY_DB.PUBLIC.PERSON_INFO_LOADING " ,
" operationType " : " ALTER " ,
" properties " : {
" objectName " : { " value " : " DUMMY_DB.PUBLIC.PERSON_INFO_FINAL " }
} ,
}
query_type = " RENAME_TABLE "
ddl = extractor . parse_ddl_query (
query , session_id , timestamp , object_modified_by_ddl , query_type
)
assert ddl == TableRename (
original_urn = " urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD) " ,
new_urn = " urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD) " ,
query = query ,
session_id = session_id ,
timestamp = timestamp ,
) , " Processing ALTER ... RENAME should result in a proper TableRename object "
def test_ddl_processing_alter_table_add_column (
self , extractor , session_id , timestamp
) :
query = " ALTER TABLE person_info ADD year BIGINT "
object_modified_by_ddl = {
" objectDomain " : " Table " ,
" objectId " : 2612260 ,
" objectName " : " DUMMY_DB.PUBLIC.PERSON_INFO " ,
" operationType " : " ALTER " ,
" properties " : {
" columns " : {
" BIGINT " : {
" objectId " : { " value " : 8763407 } ,
" subOperationType " : " ADD " ,
}
}
} ,
}
query_type = " ALTER_TABLE_ADD_COLUMN "
ddl = extractor . parse_ddl_query (
query , session_id , timestamp , object_modified_by_ddl , query_type
)
assert ddl is None , (
" For altering columns statement ddl parsing should return None "
)
assert extractor . report . num_ddl_queries_dropped == 1 , (
" Dropped ddls should be properly counted "
)
def test_ddl_processing_alter_table_swap ( self , extractor , session_id , timestamp ) :
query = " ALTER TABLE person_info SWAP WITH person_info_swap; "
object_modified_by_ddl = {
" objectDomain " : " Table " ,
" objectId " : 3776835 ,
" objectName " : " DUMMY_DB.PUBLIC.PERSON_INFO " ,
" operationType " : " ALTER " ,
" properties " : {
" swapTargetDomain " : { " value " : " Table " } ,
" swapTargetId " : { " value " : 3786260 } ,
" swapTargetName " : { " value " : " DUMMY_DB.PUBLIC.PERSON_INFO_SWAP " } ,
} ,
}
query_type = " ALTER "
ddl = extractor . parse_ddl_query (
query , session_id , timestamp , object_modified_by_ddl , query_type
)
assert ddl == TableSwap (
urn1 = " urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD) " ,
urn2 = " urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD) " ,
query = query ,
session_id = session_id ,
timestamp = timestamp ,
) , " Processing ALTER ... SWAP DDL should result in a proper TableSwap object "