fix(ingest/snowflake): fix test connection (#10927)

This commit is contained in:
Harshal Sheth 2024-07-17 11:57:58 -07:00 committed by GitHub
parent 5f796219d3
commit 7f3da47e90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 70 deletions

View File

@ -294,10 +294,12 @@ class SnowflakeV2Source(
capabilities: List[SourceCapability] = [c.capability for c in SnowflakeV2Source.get_capabilities() if c.capability not in (SourceCapability.PLATFORM_INSTANCE, SourceCapability.DOMAINS, SourceCapability.DELETION_DETECTION)] # type: ignore capabilities: List[SourceCapability] = [c.capability for c in SnowflakeV2Source.get_capabilities() if c.capability not in (SourceCapability.PLATFORM_INSTANCE, SourceCapability.DOMAINS, SourceCapability.DELETION_DETECTION)] # type: ignore
cur = conn.query("select current_role()") cur = conn.query("select current_role()")
current_role = [row[0] for row in cur][0] current_role = [row["CURRENT_ROLE()"] for row in cur][0]
cur = conn.query("select current_secondary_roles()") cur = conn.query("select current_secondary_roles()")
secondary_roles_str = json.loads([row[0] for row in cur][0])["roles"] secondary_roles_str = json.loads(
[row["CURRENT_SECONDARY_ROLES()"] for row in cur][0]
)["roles"]
secondary_roles = ( secondary_roles = (
[] if secondary_roles_str == "" else secondary_roles_str.split(",") [] if secondary_roles_str == "" else secondary_roles_str.split(",")
) )
@ -316,7 +318,9 @@ class SnowflakeV2Source(
cur = conn.query(f'show grants to role "{role}"') cur = conn.query(f'show grants to role "{role}"')
for row in cur: for row in cur:
privilege = SnowflakePrivilege( privilege = SnowflakePrivilege(
privilege=row[1], object_type=row[2], object_name=row[3] privilege=row["privilege"],
object_type=row["granted_on"],
object_name=row["name"],
) )
privileges.append(privilege) privileges.append(privilege)
@ -379,7 +383,7 @@ class SnowflakeV2Source(
roles.append(privilege.object_name) roles.append(privilege.object_name)
cur = conn.query("select current_warehouse()") cur = conn.query("select current_warehouse()")
current_warehouse = [row[0] for row in cur][0] current_warehouse = [row["CURRENT_WAREHOUSE()"] for row in cur][0]
default_failure_messages = { default_failure_messages = {
SourceCapability.SCHEMA_METADATA: "Either no tables exist or current role does not have permissions to access them", SourceCapability.SCHEMA_METADATA: "Either no tables exist or current role does not have permissions to access them",

View File

@ -274,21 +274,31 @@ def test_test_connection_basic_success(mock_connect):
test_connection_helpers.assert_basic_connectivity_success(report) test_connection_helpers.assert_basic_connectivity_success(report)
def setup_mock_connect(mock_connect, query_results=None): class MissingQueryMock(Exception):
def default_query_results(query): pass
def setup_mock_connect(mock_connect, extra_query_results=None):
def query_results(query):
if extra_query_results is not None:
try:
return extra_query_results(query)
except MissingQueryMock:
pass
if query == "select current_role()": if query == "select current_role()":
return [("TEST_ROLE",)] return [{"CURRENT_ROLE()": "TEST_ROLE"}]
elif query == "select current_secondary_roles()": elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)] return [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}]
elif query == "select current_warehouse()": elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")] return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}]
raise ValueError(f"Unexpected query: {query}") elif query == 'show grants to role "PUBLIC"':
return []
raise MissingQueryMock(f"Unexpected query: {query}")
connection_mock = MagicMock() connection_mock = MagicMock()
cursor_mock = MagicMock() cursor_mock = MagicMock()
cursor_mock.execute.side_effect = ( cursor_mock.execute.side_effect = query_results
query_results if query_results is not None else default_query_results
)
connection_mock.cursor.return_value = cursor_mock connection_mock.cursor.return_value = cursor_mock
mock_connect.return_value = connection_mock mock_connect.return_value = connection_mock
@ -296,21 +306,11 @@ def setup_mock_connect(mock_connect, query_results=None):
@patch("snowflake.connector.connect") @patch("snowflake.connector.connect")
def test_test_connection_no_warehouse(mock_connect): def test_test_connection_no_warehouse(mock_connect):
def query_results(query): def query_results(query):
if query == "select current_role()": if query == "select current_warehouse()":
return [("TEST_ROLE",)] return [{"CURRENT_WAREHOUSE()": None}]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [(None,)]
elif query == 'show grants to role "TEST_ROLE"': elif query == 'show grants to role "TEST_ROLE"':
return [ return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
("", "USAGE", "DATABASE", "DB1"), raise MissingQueryMock(f"Unexpected query: {query}")
("", "USAGE", "SCHEMA", "DB1.SCHEMA1"),
("", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"),
]
elif query == 'show grants to role "PUBLIC"':
return []
raise ValueError(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results) setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection( report = test_connection_helpers.run_test_connection(
@ -330,17 +330,9 @@ def test_test_connection_no_warehouse(mock_connect):
@patch("snowflake.connector.connect") @patch("snowflake.connector.connect")
def test_test_connection_capability_schema_failure(mock_connect): def test_test_connection_capability_schema_failure(mock_connect):
def query_results(query): def query_results(query):
if query == "select current_role()": if query == 'show grants to role "TEST_ROLE"':
return [("TEST_ROLE",)] return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
elif query == "select current_secondary_roles()": raise MissingQueryMock(f"Unexpected query: {query}")
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE",)]
elif query == 'show grants to role "TEST_ROLE"':
return [("", "USAGE", "DATABASE", "DB1")]
elif query == 'show grants to role "PUBLIC"':
return []
raise ValueError(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results) setup_mock_connect(mock_connect, query_results)
@ -361,21 +353,17 @@ def test_test_connection_capability_schema_failure(mock_connect):
@patch("snowflake.connector.connect") @patch("snowflake.connector.connect")
def test_test_connection_capability_schema_success(mock_connect): def test_test_connection_capability_schema_success(mock_connect):
def query_results(query): def query_results(query):
if query == "select current_role()": if query == 'show grants to role "TEST_ROLE"':
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")]
elif query == 'show grants to role "TEST_ROLE"':
return [ return [
["", "USAGE", "DATABASE", "DB1"], {"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
["", "USAGE", "SCHEMA", "DB1.SCHEMA1"], {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
["", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"], {
"privilege": "REFERENCES",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
] ]
elif query == 'show grants to role "PUBLIC"': raise MissingQueryMock(f"Unexpected query: {query}")
return []
raise ValueError(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results) setup_mock_connect(mock_connect, query_results)
@ -397,30 +385,38 @@ def test_test_connection_capability_schema_success(mock_connect):
@patch("snowflake.connector.connect") @patch("snowflake.connector.connect")
def test_test_connection_capability_all_success(mock_connect): def test_test_connection_capability_all_success(mock_connect):
def query_results(query): def query_results(query):
if query == "select current_role()": if query == 'show grants to role "TEST_ROLE"':
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")]
elif query == 'show grants to role "TEST_ROLE"':
return [ return [
("", "USAGE", "DATABASE", "DB1"), {"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
("", "USAGE", "SCHEMA", "DB1.SCHEMA1"), {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
("", "SELECT", "TABLE", "DB1.SCHEMA1.TABLE1"), {
("", "USAGE", "ROLE", "TEST_USAGE_ROLE"), "privilege": "SELECT",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
{"privilege": "USAGE", "granted_on": "ROLE", "name": "TEST_USAGE_ROLE"},
] ]
elif query == 'show grants to role "PUBLIC"':
return []
elif query == 'show grants to role "TEST_USAGE_ROLE"': elif query == 'show grants to role "TEST_USAGE_ROLE"':
return [ return [
["", "USAGE", "DATABASE", "SNOWFLAKE"], {"privilege": "USAGE", "granted_on": "DATABASE", "name": "SNOWFLAKE"},
["", "USAGE", "SCHEMA", "ACCOUNT_USAGE"], {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "ACCOUNT_USAGE"},
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY"], {
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY"], "privilege": "USAGE",
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES"], "granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES",
},
] ]
raise ValueError(f"Unexpected query: {query}") raise MissingQueryMock(f"Unexpected query: {query}")
setup_mock_connect(mock_connect, query_results) setup_mock_connect(mock_connect, query_results)