Fix #21093 : Update test connection improvements (#23516)

* Update test connection improvements

* Update queries

* checkstyle

* fix test failure

---------

Co-authored-by: Akash Verma <akashverma@Akashs-MacBook-Pro-2.local>
This commit is contained in:
Akash Verma 2025-10-03 13:50:46 +05:30 committed by GitHub
parent 7b5834e044
commit 06453a925d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 256 additions and 8 deletions

View File

@ -41,6 +41,13 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
DATABRICKS_SQL_STATEMENT_TEST,
TEST_CATALOG_TAGS,
TEST_COLUMN_LINEAGE,
TEST_COLUMN_TAGS,
TEST_SCHEMA_TAGS,
TEST_TABLE_LINEAGE,
TEST_TABLE_TAGS,
TEST_VIEW_DEFINITIONS,
)
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger
@ -170,6 +177,11 @@ def test_connection(
# Create wrapper to avoid multiple schema calls
engine_wrapper = DatabricksEngineWrapper(connection)
# Helper function to get first catalog for tag queries
def get_first_catalog():
catalogs = engine_wrapper.get_catalogs(catalog_name=service_connection.catalog)
return catalogs[0] if catalogs else service_connection.catalog or "main"
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": partial(
@ -187,6 +199,41 @@ def test_connection(
query_history=service_connection.queryHistoryTable
),
),
"GetViewDefinitions": partial(
test_database_query,
engine=connection,
statement=TEST_VIEW_DEFINITIONS,
),
"GetCatalogTags": partial(
test_database_query,
engine=connection,
statement=TEST_CATALOG_TAGS.format(database_name=get_first_catalog()),
),
"GetSchemaTags": partial(
test_database_query,
engine=connection,
statement=TEST_SCHEMA_TAGS.format(database_name=get_first_catalog()),
),
"GetTableTags": partial(
test_database_query,
engine=connection,
statement=TEST_TABLE_TAGS.format(database_name=get_first_catalog()),
),
"GetColumnTags": partial(
test_database_query,
engine=connection,
statement=TEST_COLUMN_TAGS.format(database_name=get_first_catalog()),
),
"GetTableLineage": partial(
test_database_query,
engine=connection,
statement=TEST_TABLE_LINEAGE,
),
"GetColumnLineage": partial(
test_database_query,
engine=connection,
statement=TEST_COLUMN_LINEAGE,
),
}
return test_connection_steps(

View File

@ -114,3 +114,64 @@ DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB = textwrap.dedent(
AND event_time >= current_date() - INTERVAL 90 DAYS
"""
)
# Test connection queries
TEST_VIEW_DEFINITIONS = textwrap.dedent(
"""
SELECT
TABLE_NAME,
TABLE_SCHEMA,
VIEW_DEFINITION
FROM INFORMATION_SCHEMA.VIEWS
WHERE VIEW_DEFINITION IS NOT NULL
LIMIT 1
"""
)
TEST_CATALOG_TAGS = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM `{database_name}`.information_schema.catalog_tags
WHERE 1=0
"""
)
TEST_SCHEMA_TAGS = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM `{database_name}`.information_schema.schema_tags
WHERE 1=0
"""
)
TEST_TABLE_TAGS = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM `{database_name}`.information_schema.table_tags
WHERE 1=0
"""
)
TEST_COLUMN_TAGS = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM `{database_name}`.information_schema.column_tags
WHERE 1=0
"""
)
TEST_TABLE_LINEAGE = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM system.access.table_lineage
WHERE 1=0
"""
)
TEST_COLUMN_LINEAGE = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM system.access.column_lineage
WHERE 1=0
"""
)

View File

@ -48,7 +48,9 @@ from metadata.ingestion.connections.test_connections import test_connection_db_c
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.queries import (
CHECK_ACCESS_TO_ALL,
TEST_MATERIALIZED_VIEWS,
TEST_ORACLE_GET_STORED_PACKAGES,
TEST_QUERY_HISTORY,
)
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger
@ -99,6 +101,8 @@ class OracleConnection(BaseConnection[OracleConnectionConfig, Engine]):
test_conn_queries = {
"CheckAccess": CHECK_ACCESS_TO_ALL,
"PackageAccess": TEST_ORACLE_GET_STORED_PACKAGES,
"GetMaterializedViews": TEST_MATERIALIZED_VIEWS,
"GetQueryHistory": TEST_QUERY_HISTORY,
}
return test_connection_db_common(

View File

@ -135,6 +135,24 @@ WHERE
)
CHECK_ACCESS_TO_ALL = "SELECT table_name FROM DBA_TABLES where ROWNUM < 2"
TEST_MATERIALIZED_VIEWS = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM DBA_MVIEWS
WHERE ROWNUM = 1
"""
)
TEST_QUERY_HISTORY = textwrap.dedent(
"""
SELECT COUNT(*) as count
FROM gv$sql
WHERE ROWNUM = 1
"""
)
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY AS (SELECT

View File

@ -43,6 +43,9 @@ from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_DATABASE,
POSTGRES_TEST_GET_QUERIES,
POSTGRES_TEST_GET_TAGS,
TEST_COLUMN_METADATA,
TEST_INFORMATION_SCHEMA_COLUMNS,
TEST_TABLE_COMMENTS,
)
from metadata.ingestion.source.database.postgres.utils import (
get_postgres_time_column_name,
@ -91,6 +94,9 @@ class PostgresConnection(BaseConnection[PostgresConnectionConfig, Engine]):
),
"GetDatabases": POSTGRES_GET_DATABASE,
"GetTags": POSTGRES_TEST_GET_TAGS,
"GetColumnMetadata": TEST_COLUMN_METADATA,
"GetTableComments": TEST_TABLE_COMMENTS,
"GetInformationSchemaColumns": TEST_INFORMATION_SCHEMA_COLUMNS,
}
return test_connection_db_common(
metadata=metadata,

View File

@ -249,3 +249,25 @@ WHERE
prokind = 'f'
and pg_namespace.nspname = '{schema_name}';
"""
TEST_COLUMN_METADATA = """
SELECT COUNT(*) as count
FROM pg_catalog.pg_attribute a
LEFT JOIN pg_catalog.pg_description pgd ON (
pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
WHERE 1=0
"""
TEST_TABLE_COMMENTS = """
SELECT COUNT(*) as count
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid
WHERE 1=0
"""
TEST_INFORMATION_SCHEMA_COLUMNS = """
SELECT COUNT(*) as count
FROM information_schema.columns
WHERE 1=0
"""

View File

@ -717,10 +717,15 @@ class DatabricksConnectionTest(TestCase):
mock_inspector.get_schema_names.assert_called_once()
# pylint: disable=too-many-locals
@patch(
"metadata.ingestion.source.database.databricks.connection.DatabricksEngineWrapper"
)
@patch(
"metadata.ingestion.source.database.databricks.connection.test_connection_steps"
)
def test_test_connection_function(self, mock_test_connection_steps):
def test_test_connection_function(
self, mock_test_connection_steps, mock_engine_wrapper_class
):
"""Test the test_connection function"""
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
@ -769,7 +774,13 @@ class DatabricksConnectionTest(TestCase):
queryHistoryTable="test_table",
)
# Mock the DatabricksEngineWrapper instance to avoid context manager error
mock_wrapper_instance = Mock()
mock_wrapper_instance.get_catalogs.return_value = ["main"]
mock_engine_wrapper_class.return_value = mock_wrapper_instance
mock_engine = Mock()
mock_engine.connect.return_value = Mock()
mock_metadata = Mock()
# Test the function
@ -795,6 +806,13 @@ class DatabricksConnectionTest(TestCase):
"GetViews",
"GetDatabases",
"GetQueries",
"GetViewDefinitions",
"GetCatalogTags",
"GetSchemaTags",
"GetTableTags",
"GetColumnTags",
"GetTableLineage",
"GetColumnLineage",
]
for key in expected_keys:
self.assertIn(key, test_fn)

View File

@ -1,7 +1,7 @@
{
"name": "Databricks",
"displayName": "Databricks Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas, tables, views, tags, and lineage information.",
"steps": [
{
"name": "CheckAccess",
@ -30,7 +30,7 @@
},
{
"name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the views of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
},
@ -39,6 +39,48 @@
"description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to databricks.",
"errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.",
"mandatory": false
},
{
"name": "GetViewDefinitions",
"description": "Validate access to INFORMATION_SCHEMA.VIEWS for fetching view definitions.",
"errorMessage": "Failed to fetch view definitions from INFORMATION_SCHEMA.VIEWS. Please ensure the user has SELECT privilege on INFORMATION_SCHEMA.VIEWS. This is required for extracting view metadata.",
"mandatory": false
},
{
"name": "GetCatalogTags",
"description": "Validate access to information_schema.catalog_tags for catalog tag extraction.",
"errorMessage": "Failed to access information_schema.catalog_tags. Please ensure the user has SELECT privilege on information_schema.catalog_tags. This is required for extracting catalog-level tags.",
"mandatory": false
},
{
"name": "GetSchemaTags",
"description": "Validate access to information_schema.schema_tags for schema tag extraction.",
"errorMessage": "Failed to access information_schema.schema_tags. Please ensure the user has SELECT privilege on information_schema.schema_tags. This is required for extracting schema-level tags.",
"mandatory": false
},
{
"name": "GetTableTags",
"description": "Validate access to information_schema.table_tags for table tag extraction.",
"errorMessage": "Failed to access information_schema.table_tags. Please ensure the user has SELECT privilege on information_schema.table_tags. This is required for extracting table-level tags.",
"mandatory": false
},
{
"name": "GetColumnTags",
"description": "Validate access to information_schema.column_tags for column tag extraction.",
"errorMessage": "Failed to access information_schema.column_tags. Please ensure the user has SELECT privilege on information_schema.column_tags. This is required for extracting column-level tags.",
"mandatory": false
},
{
"name": "GetTableLineage",
"description": "Validate access to system.access.table_lineage for table lineage tracking.",
"errorMessage": "Failed to access system.access.table_lineage. Please ensure the user has SELECT privilege on system.access.table_lineage. This is required for table-level lineage extraction.",
"mandatory": false
},
{
"name": "GetColumnLineage",
"description": "Validate access to system.access.column_lineage for column lineage tracking.",
"errorMessage": "Failed to access system.access.column_lineage. Please ensure the user has SELECT privilege on system.access.column_lineage. This is required for column-level lineage extraction.",
"mandatory": false
}
]
}

View File

@ -1,10 +1,10 @@
{
"name": "Oracle",
"displayName": "Oracle Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas, tables, views, and query history.",
"steps": [
{
"name": "CheckAccess",
"name": "CheckAccess",
"description": "Validate that we can properly reach the database and authenticate with the given credentials.",
"errorMessage": "Failed to connect to oracle, please validate if the user has relevant permissions, if not, please provide the necessary permissions. For more details, please refer https://docs.open-metadata.org/connectors/database/oracle.",
"shortCircuit": true,
@ -23,16 +23,28 @@
"mandatory": true
},
{
"name": "GetTables",
"name": "GetTables",
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
"mandatory": true
},
{
"name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the views of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
},
{
"name": "GetMaterializedViews",
"description": "Validate that we can access materialized view definitions from DBA_MVIEWS.",
"errorMessage": "Failed to fetch materialized views from DBA_MVIEWS. Please ensure the user has SELECT privilege on DBA_MVIEWS. This is required for extracting materialized view metadata.",
"mandatory": false
},
{
"name": "GetQueryHistory",
"description": "Validate access to query history from gv$sql for usage and lineage extraction.",
"errorMessage": "Failed to access query history from gv$sql. Please ensure the user has SELECT privilege on gv$sql or v$sql. This is required for usage tracking and lineage extraction features.",
"mandatory": false
}
]
}

View File

@ -1,7 +1,7 @@
{
"name": "Postgres",
"displayName": "Postgres Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables, including autoclassification support.",
"steps": [
{
"name": "CheckAccess",
@ -44,6 +44,24 @@
"description":"Check if we can access the pg_stat_statements table to get query logs, These queries are analyzed in the usage & lineage workflow.",
"errorMessage": "Failed to fetch queries, please validate if postgres instance has pg_stat_statements extension installed and the user has at least select privileges for pg_stat_statements table.",
"mandatory": false
},
{
"name": "GetColumnMetadata",
"description": "Validate access to column metadata from pg_catalog.pg_attribute for PII pattern detection in column names and types.",
"errorMessage": "Failed to access column metadata. Please ensure the user has SELECT privileges on pg_catalog.pg_attribute and pg_catalog.pg_description. This is required for autoclassification to analyze column names and types for PII detection.",
"mandatory": false
},
{
"name": "GetTableComments",
"description": "Validate access to table and column comments that may contain data classification hints.",
"errorMessage": "Failed to access table comments. Please ensure the user has SELECT privileges on pg_catalog.pg_class, pg_catalog.pg_namespace, and pg_catalog.pg_description. Comments often contain classification hints like 'PII' or 'sensitive'.",
"mandatory": false
},
{
"name": "GetInformationSchemaColumns",
"description": "Validate access to information_schema.columns for standardized column information used in data type pattern matching.",
"errorMessage": "Failed to access information_schema.columns. Please ensure the user has SELECT privileges on information_schema.columns. This is required for analyzing data type patterns in autoclassification.",
"mandatory": false
}
]
}