datahub/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py

812 lines
28 KiB
Python

import datetime
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from pydantic import ValidationError
import datahub.ingestion.source.snowflake.snowflake_utils
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.source.snowflake.constants import (
CLIENT_PREFETCH_THREADS,
CLIENT_SESSION_KEEP_ALIVE,
SnowflakeCloudProvider,
)
from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
from datahub.ingestion.source.snowflake.snowflake_config import (
DEFAULT_TEMP_TABLES_PATTERNS,
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge
from datahub.ingestion.source.snowflake.snowflake_queries import (
SnowflakeQueriesExtractor,
SnowflakeQueriesExtractorConfig,
)
from datahub.ingestion.source.snowflake.snowflake_query import (
SnowflakeQuery,
create_deny_regex_sql_filter,
)
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
SnowflakeObjectAccessEntry,
)
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
SnowsightUrlBuilder,
)
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
from datahub.sql_parsing.sql_parsing_aggregator import TableRename, TableSwap
from datahub.testing.doctest import assert_doctest
from tests.integration.snowflake.common import inject_rowcount
from tests.test_helpers import test_connection_helpers
default_oauth_dict: Dict[str, Any] = {
"client_id": "client_id",
"client_secret": "secret",
"use_certificate": False,
"provider": "microsoft",
"scopes": ["datahub_role"],
"authority_url": "https://dev-abc.okta.com/oauth2/def/v1/token",
}
def test_snowflake_source_throws_error_on_account_id_missing():
with pytest.raises(ValidationError, match="account_id\n field required"):
SnowflakeV2Config.parse_obj(
{
"username": "user",
"password": "password",
}
)
def test_no_client_id_invalid_oauth_config():
oauth_dict = default_oauth_dict.copy()
del oauth_dict["client_id"]
with pytest.raises(ValueError, match="client_id\n field required"):
OAuthConfiguration.parse_obj(oauth_dict)
def test_snowflake_throws_error_on_client_secret_missing_if_use_certificate_is_false():
oauth_dict = default_oauth_dict.copy()
del oauth_dict["client_secret"]
OAuthConfiguration.parse_obj(oauth_dict)
with pytest.raises(
ValueError,
match="'oauth_config.client_secret' was none but should be set when using use_certificate false for oauth_config",
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"oauth_config": oauth_dict,
}
)
def test_snowflake_throws_error_on_encoded_oauth_private_key_missing_if_use_certificate_is_true():
oauth_dict = default_oauth_dict.copy()
oauth_dict["use_certificate"] = True
OAuthConfiguration.parse_obj(oauth_dict)
with pytest.raises(
ValueError,
match="'base64_encoded_oauth_private_key' was none but should be set when using certificate for oauth_config",
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"oauth_config": oauth_dict,
}
)
def test_snowflake_oauth_okta_does_not_support_certificate():
oauth_dict = default_oauth_dict.copy()
oauth_dict["use_certificate"] = True
oauth_dict["provider"] = "okta"
OAuthConfiguration.parse_obj(oauth_dict)
with pytest.raises(
ValueError, match="Certificate authentication is not supported for Okta."
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"oauth_config": oauth_dict,
}
)
def test_snowflake_oauth_happy_paths():
oauth_dict = default_oauth_dict.copy()
oauth_dict["provider"] = "okta"
assert SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"oauth_config": oauth_dict,
}
)
oauth_dict["use_certificate"] = True
oauth_dict["provider"] = "microsoft"
oauth_dict["encoded_oauth_public_key"] = "publickey"
oauth_dict["encoded_oauth_private_key"] = "privatekey"
assert SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"oauth_config": oauth_dict,
}
)
def test_snowflake_oauth_token_happy_path():
assert SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR_TOKEN",
"token": "valid-token",
"username": "test-user",
"oauth_config": None,
}
)
def test_snowflake_oauth_token_without_token():
with pytest.raises(
ValidationError, match="Token required for OAUTH_AUTHENTICATOR_TOKEN."
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR_TOKEN",
"username": "test-user",
}
)
def test_snowflake_oauth_token_with_wrong_auth_type():
with pytest.raises(
ValueError,
match="Token can only be provided when using OAUTH_AUTHENTICATOR_TOKEN.",
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR",
"token": "some-token",
"username": "test-user",
}
)
def test_snowflake_oauth_token_with_empty_token():
with pytest.raises(
ValidationError, match="Token required for OAUTH_AUTHENTICATOR_TOKEN."
):
SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"authentication_type": "OAUTH_AUTHENTICATOR_TOKEN",
"token": "",
"username": "test-user",
}
)
default_config_dict: Dict[str, Any] = {
"username": "user",
"password": "password",
"account_id": "https://acctname.snowflakecomputing.com",
"warehouse": "COMPUTE_WH",
"role": "sysadmin",
}
def test_account_id_is_added_when_host_port_is_present():
config_dict = default_config_dict.copy()
del config_dict["account_id"]
config_dict["host_port"] = "acctname"
config = SnowflakeV2Config.parse_obj(config_dict)
assert config.account_id == "acctname"
def test_account_id_with_snowflake_host_suffix():
config = SnowflakeV2Config.parse_obj(default_config_dict)
assert config.account_id == "acctname"
def test_snowflake_uri_default_authentication():
config = SnowflakeV2Config.parse_obj(default_config_dict)
assert config.get_sql_alchemy_url() == (
"snowflake://user:password@acctname"
"?application=acryl_datahub"
"&authenticator=SNOWFLAKE"
"&role=sysadmin"
"&warehouse=COMPUTE_WH"
)
def test_snowflake_uri_external_browser_authentication():
config_dict = default_config_dict.copy()
del config_dict["password"]
config_dict["authentication_type"] = "EXTERNAL_BROWSER_AUTHENTICATOR"
config = SnowflakeV2Config.parse_obj(config_dict)
assert config.get_sql_alchemy_url() == (
"snowflake://user@acctname"
"?application=acryl_datahub"
"&authenticator=EXTERNALBROWSER"
"&role=sysadmin"
"&warehouse=COMPUTE_WH"
)
def test_snowflake_uri_key_pair_authentication():
config_dict = default_config_dict.copy()
del config_dict["password"]
config_dict["authentication_type"] = "KEY_PAIR_AUTHENTICATOR"
config_dict["private_key_path"] = "/a/random/path"
config_dict["private_key_password"] = "a_random_password"
config = SnowflakeV2Config.parse_obj(config_dict)
assert config.get_sql_alchemy_url() == (
"snowflake://user@acctname"
"?application=acryl_datahub"
"&authenticator=SNOWFLAKE_JWT"
"&role=sysadmin"
"&warehouse=COMPUTE_WH"
)
def test_options_contain_connect_args():
config = SnowflakeV2Config.parse_obj(default_config_dict)
connect_args = config.get_options().get("connect_args")
assert connect_args is not None
def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error():
config_dict = default_config_dict.copy()
config_dict["include_column_lineage"] = True
config_dict["include_table_lineage"] = False
with pytest.raises(
ValidationError,
match="include_table_lineage must be True for include_column_lineage to be set",
):
SnowflakeV2Config.parse_obj(config_dict)
def test_snowflake_config_with_no_connect_args_returns_base_connect_args():
config: SnowflakeV2Config = SnowflakeV2Config.parse_obj(default_config_dict)
assert config.get_options()["connect_args"] is not None
assert config.get_options()["connect_args"] == {
CLIENT_PREFETCH_THREADS: 10,
CLIENT_SESSION_KEEP_ALIVE: True,
}
def test_private_key_set_but_auth_not_changed():
with pytest.raises(
ValidationError,
match="Either `private_key` and `private_key_path` is set but `authentication_type` is DEFAULT_AUTHENTICATOR. Should be set to 'KEY_PAIR_AUTHENTICATOR' when using key pair authentication",
):
SnowflakeV2Config.parse_obj(
{
"account_id": "acctname",
"private_key_path": "/a/random/path",
}
)
def test_snowflake_config_with_connect_args_overrides_base_connect_args():
config_dict = default_config_dict.copy()
config_dict["connect_args"] = {
CLIENT_PREFETCH_THREADS: 5,
}
config: SnowflakeV2Config = SnowflakeV2Config.parse_obj(config_dict)
assert config.get_options()["connect_args"] is not None
assert config.get_options()["connect_args"][CLIENT_PREFETCH_THREADS] == 5
assert config.get_options()["connect_args"][CLIENT_SESSION_KEEP_ALIVE] is True
@patch("snowflake.connector.connect")
def test_test_connection_failure(mock_connect):
mock_connect.side_effect = Exception("Failed to connect to snowflake")
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_failure(
report, "Failed to connect to snowflake"
)
@patch("snowflake.connector.connect")
def test_test_connection_basic_success(mock_connect):
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_success(report)
class MissingQueryMock(Exception):
pass
def setup_mock_connect(mock_connect, extra_query_results=None):
@inject_rowcount
def query_results(query):
if extra_query_results is not None:
try:
return extra_query_results(query)
except MissingQueryMock:
pass
if query == "select current_role()":
return [{"CURRENT_ROLE()": "TEST_ROLE"}]
elif query == "select current_secondary_roles()":
return [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}]
elif query == "select current_warehouse()":
return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}]
elif query == 'show grants to role "PUBLIC"':
return []
raise MissingQueryMock(f"Unexpected query: {query}")
connection_mock = MagicMock()
cursor_mock = MagicMock()
cursor_mock.execute.side_effect = query_results
connection_mock.cursor.return_value = cursor_mock
mock_connect.return_value = connection_mock
@patch("snowflake.connector.connect")
def test_test_connection_no_warehouse(mock_connect):
def query_results(query):
if query == "select current_warehouse()":
return [{"CURRENT_WAREHOUSE()": None}]
elif query == 'show grants to role "TEST_ROLE"':
return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
raise MissingQueryMock(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[SourceCapability.CONTAINERS],
failure_capabilities={
SourceCapability.SCHEMA_METADATA: "Current role TEST_ROLE does not have permissions to use warehouse"
},
)
@patch("snowflake.connector.connect")
def test_test_connection_capability_schema_failure(mock_connect):
def query_results(query):
if query == 'show grants to role "TEST_ROLE"':
return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
raise MissingQueryMock(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[SourceCapability.CONTAINERS],
failure_capabilities={
SourceCapability.SCHEMA_METADATA: "Either no tables exist or current role does not have permissions to access them"
},
)
@patch("snowflake.connector.connect")
def test_test_connection_capability_schema_success(mock_connect):
def query_results(query):
if query == 'show grants to role "TEST_ROLE"':
return [
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
{
"privilege": "REFERENCES",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
]
raise MissingQueryMock(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[
SourceCapability.CONTAINERS,
SourceCapability.SCHEMA_METADATA,
SourceCapability.DESCRIPTIONS,
],
)
@patch("snowflake.connector.connect")
def test_test_connection_capability_all_success(mock_connect):
def query_results(query):
if query == 'show grants to role "TEST_ROLE"':
return [
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
{
"privilege": "SELECT",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
{"privilege": "USAGE", "granted_on": "ROLE", "name": "TEST_USAGE_ROLE"},
]
elif query == 'show grants to role "TEST_USAGE_ROLE"':
return [
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "SNOWFLAKE"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "ACCOUNT_USAGE"},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES",
},
]
raise MissingQueryMock(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection(
SnowflakeV2Source, default_config_dict
)
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[
SourceCapability.CONTAINERS,
SourceCapability.SCHEMA_METADATA,
SourceCapability.DATA_PROFILING,
SourceCapability.DESCRIPTIONS,
SourceCapability.LINEAGE_COARSE,
],
)
def test_aws_cloud_region_from_snowflake_region_id():
(
cloud,
cloud_region_id,
) = SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"aws_ca_central_1"
)
assert cloud == SnowflakeCloudProvider.AWS
assert cloud_region_id == "ca-central-1"
(
cloud,
cloud_region_id,
) = SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"aws_us_east_1_gov"
)
assert cloud == SnowflakeCloudProvider.AWS
assert cloud_region_id == "us-east-1"
def test_google_cloud_region_from_snowflake_region_id():
(
cloud,
cloud_region_id,
) = SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"gcp_europe_west2"
)
assert cloud == SnowflakeCloudProvider.GCP
assert cloud_region_id == "europe-west2"
def test_azure_cloud_region_from_snowflake_region_id():
(
cloud,
cloud_region_id,
) = SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"azure_switzerlandnorth"
)
assert cloud == SnowflakeCloudProvider.AZURE
assert cloud_region_id == "switzerland-north"
(
cloud,
cloud_region_id,
) = SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"azure_centralindia"
)
assert cloud == SnowflakeCloudProvider.AZURE
assert cloud_region_id == "central-india"
def test_unknown_cloud_region_from_snowflake_region_id():
with pytest.raises(Exception, match="Unknown snowflake region"):
SnowsightUrlBuilder.get_cloud_region_from_snowflake_region_id(
"somecloud_someregion"
)
def test_snowflake_object_access_entry_missing_object_id():
SnowflakeObjectAccessEntry(
**{
"columns": [
{"columnName": "A"},
{"columnName": "B"},
],
"objectDomain": "View",
"objectName": "SOME.OBJECT.NAME",
}
)
def test_snowflake_query_create_deny_regex_sql():
assert create_deny_regex_sql_filter([], ["col"]) == ""
assert (
create_deny_regex_sql_filter([".*tmp.*"], ["col"])
== "NOT RLIKE(col,'.*tmp.*','i')"
)
assert (
create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col"])
== "NOT RLIKE(col,'.*tmp.*','i') AND NOT RLIKE(col,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')"
)
assert (
create_deny_regex_sql_filter([".*tmp.*", UUID_REGEX], ["col1", "col2"])
== "NOT RLIKE(col1,'.*tmp.*','i') AND NOT RLIKE(col1,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(col2,'.*tmp.*','i') AND NOT RLIKE(col2,'[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')"
)
assert (
create_deny_regex_sql_filter(
DEFAULT_TEMP_TABLES_PATTERNS, ["upstream_table_name"]
)
== r"NOT RLIKE(upstream_table_name,'.*\.FIVETRAN_.*_STAGING\..*','i') AND NOT RLIKE(upstream_table_name,'.*__DBT_TMP$','i') AND NOT RLIKE(upstream_table_name,'.*\.SEGMENT_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(upstream_table_name,'.*\.STAGING_.*_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(upstream_table_name,'.*\.(GE_TMP_|GE_TEMP_|GX_TEMP_)[0-9A-F]{8}','i')"
)
def test_snowflake_temporary_patterns_config_rename():
conf = SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"username": "user",
"password": "password",
"upstreams_deny_pattern": [".*tmp.*"],
}
)
assert conf.temporary_tables_pattern == [".*tmp.*"]
def test_email_filter_query_generation_with_one_deny():
email_filter = AllowDenyPattern(deny=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == " AND NOT (rlike(user_name, '.*@example.com','i'))"
def test_email_filter_query_generation_without_any_filter():
email_filter = AllowDenyPattern()
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == ""
def test_email_filter_query_generation_one_allow():
email_filter = AllowDenyPattern(allow=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == "AND (rlike(user_name, '.*@example.com','i'))"
def test_email_filter_query_generation_one_allow_and_deny():
email_filter = AllowDenyPattern(
allow=[".*@example.com", ".*@example2.com"],
deny=[".*@example2.com", ".*@example4.com"],
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','i') OR rlike(user_name, '.*@example2.com','i')) AND NOT (rlike(user_name, '.*@example2.com','i') OR rlike(user_name, '.*@example4.com','i'))"
)
def test_email_filter_query_generation_with_case_insensitive_filter():
email_filter = AllowDenyPattern(
allow=[".*@example.com"], deny=[".*@example2.com"], ignoreCase=False
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','c')) AND NOT (rlike(user_name, '.*@example2.com','c'))"
)
def test_create_snowsight_base_url_us_west():
result = SnowsightUrlBuilder(
"account_locator", "aws_us_west_2", privatelink=False
).snowsight_base_url
assert result == "https://app.snowflake.com/us-west-2/account_locator/"
def test_create_snowsight_base_url_ap_northeast_1():
result = SnowsightUrlBuilder(
"account_locator", "aws_ap_northeast_1", privatelink=False
).snowsight_base_url
assert result == "https://app.snowflake.com/ap-northeast-1.aws/account_locator/"
def test_snowflake_utils() -> None:
assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils)
def test_using_removed_fields_causes_no_error() -> None:
assert SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"username": "snowflake",
"password": "snowflake",
"include_view_lineage": "true",
"include_view_column_lineage": "true",
}
)
def test_snowflake_query_result_parsing():
db_row = {
"DOWNSTREAM_TABLE_NAME": "db.schema.downstream_table",
"DOWNSTREAM_TABLE_DOMAIN": "Table",
"UPSTREAM_TABLES": [
{
"query_id": "01b92f61-0611-c826-000d-0103cf9b5db7",
"upstream_object_domain": "Table",
"upstream_object_name": "db.schema.upstream_table",
}
],
"UPSTREAM_COLUMNS": [{}],
"QUERIES": [
{
"query_id": "01b92f61-0611-c826-000d-0103cf9b5db7",
"query_text": "Query test",
"start_time": "2022-12-01 19:56:34",
}
],
}
assert UpstreamLineageEdge.parse_obj(db_row)
class TestDDLProcessing:
@pytest.fixture
def session_id(self):
return "14774700483022321"
@pytest.fixture
def timestamp(self):
return datetime.datetime(
year=2025, month=2, day=3, hour=15, minute=1, second=43
).astimezone(datetime.timezone.utc)
@pytest.fixture
def extractor(self) -> SnowflakeQueriesExtractor:
connection = MagicMock()
config = SnowflakeQueriesExtractorConfig()
structured_report = MagicMock()
filters = MagicMock()
structured_report.num_ddl_queries_dropped = 0
identifier_config = SnowflakeIdentifierConfig()
identifiers = SnowflakeIdentifierBuilder(identifier_config, structured_report)
return SnowflakeQueriesExtractor(
connection, config, structured_report, filters, identifiers
)
def test_ddl_processing_alter_table_rename(self, extractor, session_id, timestamp):
query = "ALTER TABLE person_info_loading RENAME TO person_info_final;"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 1789034,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO_LOADING",
"operationType": "ALTER",
"properties": {
"objectName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_FINAL"}
},
}
query_type = "RENAME_TABLE"
ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)
assert ddl == TableRename(
original_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD)",
new_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD)",
query=query,
session_id=session_id,
timestamp=timestamp,
), "Processing ALTER ... RENAME should result in a proper TableRename object"
def test_ddl_processing_alter_table_add_column(
self, extractor, session_id, timestamp
):
query = "ALTER TABLE person_info ADD year BIGINT"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 2612260,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
"operationType": "ALTER",
"properties": {
"columns": {
"BIGINT": {
"objectId": {"value": 8763407},
"subOperationType": "ADD",
}
}
},
}
query_type = "ALTER_TABLE_ADD_COLUMN"
ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)
assert ddl is None, (
"For altering columns statement ddl parsing should return None"
)
assert extractor.report.num_ddl_queries_dropped == 1, (
"Dropped ddls should be properly counted"
)
def test_ddl_processing_alter_table_swap(self, extractor, session_id, timestamp):
query = "ALTER TABLE person_info SWAP WITH person_info_swap;"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 3776835,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
"operationType": "ALTER",
"properties": {
"swapTargetDomain": {"value": "Table"},
"swapTargetId": {"value": 3786260},
"swapTargetName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_SWAP"},
},
}
query_type = "ALTER"
ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)
assert ddl == TableSwap(
urn1="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD)",
urn2="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD)",
query=query,
session_id=session_id,
timestamp=timestamp,
), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object"