MINOR: Improve Unity Catalog Usage (#22721)

This commit is contained in:
Mayur Singal 2025-08-04 11:04:10 +05:30 committed by GitHub
parent 0a298631eb
commit b74e181d52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 84 additions and 41 deletions

View File

@ -17,6 +17,7 @@ from typing import Optional
from databricks.sdk import WorkspaceClient from databricks.sdk import WorkspaceClient
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.exc import DatabaseError
from metadata.generated.schema.entity.automations.workflow import ( from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow, Workflow as AutomationWorkflow,
@ -34,13 +35,13 @@ from metadata.ingestion.connections.builders import (
) )
from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable
from metadata.ingestion.source.database.unitycatalog.queries import ( from metadata.ingestion.source.database.unitycatalog.queries import (
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS, UNITY_CATALOG_GET_ALL_SCHEMA_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_TAGS, UNITY_CATALOG_GET_ALL_TABLE_TAGS,
UNITY_CATALOG_GET_CATALOGS_TAGS, UNITY_CATALOG_GET_CATALOGS_TAGS,
UNITY_CATALOG_SQL_STATEMENT_TEST,
) )
from metadata.utils.constants import THREE_MIN from metadata.utils.constants import THREE_MIN
from metadata.utils.db_utils import get_host_from_host_port from metadata.utils.db_utils import get_host_from_host_port
@ -93,8 +94,20 @@ def test_connection(
Test connection. This can be executed either as part Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow of a metadata workflow or during an Automation Workflow
""" """
client = UnityCatalogClient(service_connection)
table_obj = DatabricksTable() table_obj = DatabricksTable()
engine = get_sqlalchemy_connection(service_connection)
def test_database_query(engine: Engine, statement: str):
"""
Method used to execute the given query and fetch a result
to test if user has access to the tables specified
in the sql statement
"""
try:
connection = engine.connect()
connection.execute(statement).fetchone()
except DatabaseError as soe:
logger.debug(f"Failed to fetch catalogs due to: {soe}")
def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable): def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable):
for catalog in connection.catalogs.list(): for catalog in connection.catalogs.list():
@ -149,7 +162,11 @@ def test_connection(
"GetSchemas": partial(get_schemas, connection, table_obj), "GetSchemas": partial(get_schemas, connection, table_obj),
"GetTables": partial(get_tables, connection, table_obj), "GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj), "GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access, "GetQueries": partial(
test_database_query,
engine=engine,
statement=UNITY_CATALOG_SQL_STATEMENT_TEST,
),
"GetTags": partial(get_tags, service_connection, table_obj), "GetTags": partial(get_tags, service_connection, table_obj),
} }

View File

@ -1,7 +1,19 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" """
SQL queries for Unity Catalog SQL Queries used during ingestion
""" """
import textwrap
UNITY_CATALOG_GET_CATALOGS_TAGS = """ UNITY_CATALOG_GET_CATALOGS_TAGS = """
SELECT * FROM `{database}`.information_schema.catalog_tags; SELECT * FROM `{database}`.information_schema.catalog_tags;
""" """
@ -17,3 +29,27 @@ SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{s
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """ UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """
SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}';
""" """
UNITY_CATALOG_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
statement_type AS query_type,
statement_text AS query_text,
executed_by AS user_name,
start_time AS start_time,
null AS database_name,
null AS schema_name,
end_time AS end_time,
total_duration_ms/1000 AS duration
from system.query.history
WHERE statement_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND statement_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND start_time between to_timestamp('{start_time}') and to_timestamp('{end_time}')
{filters}
LIMIT {result_limit}
"""
)
UNITY_CATALOG_SQL_STATEMENT_TEST = """
SELECT statement_text from system.query.history LIMIT 1
"""

View File

@ -27,6 +27,9 @@ from metadata.ingestion.source.database.databricks.query_parser import (
) )
from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import (
get_sqlalchemy_connection,
)
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
@ -55,6 +58,7 @@ class UnityCatalogQueryParserSource(
def __init__(self, config: WorkflowSource, metadata: OpenMetadata): def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self._init_super(config=config, metadata=metadata) self._init_super(config=config, metadata=metadata)
self.client = UnityCatalogClient(self.service_connection) self.client = UnityCatalogClient(self.service_connection)
self.sql_client = get_sqlalchemy_connection(self.service_connection)
@classmethod @classmethod
def create( def create(

View File

@ -11,12 +11,10 @@
""" """
unity catalog usage module unity catalog usage module
""" """
import traceback
from datetime import datetime
from typing import Iterable
from metadata.generated.schema.type.basic import DateTime from metadata.ingestion.source.database.unitycatalog.queries import (
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery UNITY_CATALOG_SQL_STATEMENT,
)
from metadata.ingestion.source.database.unitycatalog.query_parser import ( from metadata.ingestion.source.database.unitycatalog.query_parser import (
UnityCatalogQueryParserSource, UnityCatalogQueryParserSource,
) )
@ -35,36 +33,24 @@ class UnitycatalogUsageSource(UnityCatalogQueryParserSource, UsageSource):
the same API for fetching Usage Queries the same API for fetching Usage Queries
""" """
def yield_table_queries(self) -> Iterable[TableQuery]: sql_stmt = UNITY_CATALOG_SQL_STATEMENT
"""
Method to yield TableQueries
"""
queries = []
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data or []:
try:
if self.client.is_query_valid(row):
queries.append(
TableQuery(
dialect=self.dialect.value,
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=str(row.get("query_start_time_ms")),
endTime=str(row.get("execution_end_time_ms")),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
duration=row.get("duration")
if row.get("duration")
else None,
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to process query {row.get('query_text')} due to: {err}"
)
yield TableQueries(queries=queries) filters = """
AND statement_type NOT IN ('SHOW', 'DESCRIBE', 'USE')
"""
def get_engine(self):
yield self.sql_client
def get_sql_statement(self, start_time, end_time):
"""
returns sql statement to fetch query logs.
Override if we have specific parameters
"""
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
)