MINOR: Optimise Snowflake Test Connection (#17779)

This commit is contained in:
Mayur Singal 2024-09-13 12:55:55 +05:30 committed by GitHub
parent 0e75a9cceb
commit f2c75be78d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 53 additions and 9 deletions

View File

@ -17,6 +17,7 @@ from datetime import datetime
from functools import partial
from typing import Optional
from google.api_core.exceptions import NotFound
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
from sqlalchemy.engine import Engine
@ -149,8 +150,8 @@ def test_connection(
test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
"GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"),
"GetTables": partial(execute_inspector_func, engine, "get_table_names"),
"GetViews": partial(execute_inspector_func, engine, "get_view_names"),
"GetTables": partial(get_table_view_names, engine),
"GetViews": partial(get_table_view_names, engine),
"GetTags": test_tags,
"GetQueries": partial(
test_query,
@ -170,3 +171,28 @@ def test_connection(
)
test_connection_inner(engine)
def get_table_view_names(connection, schema=None):
with connection.connect() as conn:
current_schema = schema
client = conn.connection._client
item_types = ["TABLE", "EXTERNAL", "VIEW", "MATERIALIZED_VIEW"]
datasets = client.list_datasets()
result = []
for dataset in datasets:
if current_schema is not None and current_schema != dataset.dataset_id:
continue
try:
tables = client.list_tables(dataset.reference, page_size=1)
for table in tables:
if table.table_type in item_types:
break
except NotFound:
# It's possible that the dataset was deleted between when we
# fetched the list of datasets and when we try to list the
# tables from it. See:
# https://github.com/googleapis/python-bigquery-sqlalchemy/issues/105
pass
return result

View File

@ -16,6 +16,7 @@ from functools import partial
from typing import Optional
from sqlalchemy.engine import Engine
from sqlalchemy.sql import text
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
@ -38,6 +39,7 @@ from metadata.ingestion.connections.test_connections import (
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import kill_active_connections
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATIONS,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_TEST_GET_QUERIES,
REDSHIFT_TEST_PARTITION_DETAILS,
@ -65,6 +67,11 @@ def test_connection(
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
table_and_view_query = text(
REDSHIFT_GET_ALL_RELATIONS.format(
schema_clause="", table_clause="", limit_clause="LIMIT 1"
)
)
def test_get_queries_permissions(engine_: Engine):
"""Check if we have the right permissions to list queries"""
@ -78,8 +85,8 @@ def test_connection(
test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
"GetSchemas": partial(execute_inspector_func, engine, "get_schema_names"),
"GetTables": partial(execute_inspector_func, engine, "get_table_names"),
"GetViews": partial(execute_inspector_func, engine, "get_view_names"),
"GetTables": partial(test_query, statement=table_and_view_query, engine=engine),
"GetViews": partial(test_query, statement=table_and_view_query, engine=engine),
"GetQueries": partial(test_get_queries_permissions, engine),
"GetDatabases": partial(
test_query, statement=REDSHIFT_GET_DATABASE_NAMES, engine=engine

View File

@ -236,7 +236,7 @@ REDSHIFT_TEST_PARTITION_DETAILS = "select * from SVV_TABLE_INFO limit 1"
# hence we are appending "create view <schema>.<table> as " to select query
# to generate the column level lineage
REDSHIFT_GET_ALL_RELATIONS = """
SELECT
(SELECT
c.relkind,
n.oid as "schema_oid",
n.nspname as "schema",
@ -255,8 +255,9 @@ REDSHIFT_GET_ALL_RELATIONS = """
JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
WHERE c.relkind IN ('r', 'v', 'm', 'S', 'f')
AND n.nspname !~ '^pg_' {schema_clause} {table_clause}
{limit_clause})
UNION
SELECT
(SELECT
'r' AS "relkind",
s.esoid AS "schema_oid",
s.schemaname AS "schema",
@ -272,7 +273,8 @@ REDSHIFT_GET_ALL_RELATIONS = """
JOIN svv_external_schemas s ON s.schemaname = t.schemaname
JOIN pg_catalog.pg_user u ON u.usesysid = s.esowner
where 1 {schema_clause} {table_clause}
ORDER BY "relkind", "schema_oid", "schema";
ORDER BY "relkind", "schema_oid", "schema"
{limit_clause});
"""

View File

@ -386,7 +386,7 @@ def _get_all_relation_info(self, connection, **kw): # pylint: disable=unused-ar
result = connection.execute(
sa.text(
REDSHIFT_GET_ALL_RELATIONS.format(
schema_clause=schema_clause, table_clause=table_clause
schema_clause=schema_clause, table_clause=table_clause, limit_clause=""
)
)
)

View File

@ -45,6 +45,7 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_TEST_FETCH_TAG,
SNOWFLAKE_TEST_GET_QUERIES,
SNOWFLAKE_TEST_GET_TABLES,
SNOWFLAKE_TEST_GET_VIEWS,
)
from metadata.utils.logger import ingestion_logger
@ -176,7 +177,11 @@ def test_connection(
statement=SNOWFLAKE_TEST_GET_TABLES,
engine_wrapper=engine_wrapper,
),
"GetViews": partial(execute_inspector_func, engine_wrapper, "get_view_names"),
"GetViews": partial(
test_table_query,
statement=SNOWFLAKE_TEST_GET_VIEWS,
engine_wrapper=engine_wrapper,
),
"GetQueries": partial(
test_query, statement=SNOWFLAKE_TEST_GET_QUERIES, engine=engine
),

View File

@ -245,6 +245,10 @@ SNOWFLAKE_TEST_GET_TABLES = """
SELECT TABLE_NAME FROM "{database_name}".information_schema.tables LIMIT 1
"""
SNOWFLAKE_TEST_GET_VIEWS = """
SELECT TABLE_NAME FROM "{database_name}".information_schema.views LIMIT 1
"""
SNOWFLAKE_GET_DATABASES = "SHOW DATABASES"