diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py index f21df24ce40..594bebb5fe0 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py @@ -17,6 +17,7 @@ from typing import Optional from databricks.sdk import WorkspaceClient from sqlalchemy.engine import Engine +from sqlalchemy.exc import DatabaseError from metadata.generated.schema.entity.automations.workflow import ( Workflow as AutomationWorkflow, @@ -34,13 +35,13 @@ from metadata.ingestion.connections.builders import ( ) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable from metadata.ingestion.source.database.unitycatalog.queries import ( UNITY_CATALOG_GET_ALL_SCHEMA_TAGS, UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, UNITY_CATALOG_GET_ALL_TABLE_TAGS, UNITY_CATALOG_GET_CATALOGS_TAGS, + UNITY_CATALOG_SQL_STATEMENT_TEST, ) from metadata.utils.constants import THREE_MIN from metadata.utils.db_utils import get_host_from_host_port @@ -93,8 +94,20 @@ def test_connection( Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - client = UnityCatalogClient(service_connection) table_obj = DatabricksTable() + engine = get_sqlalchemy_connection(service_connection) + + def test_database_query(engine: Engine, statement: str): + """ + Method used to execute the given query and fetch a result + to test if user has access to the tables specified + in the sql statement + """ + try: + connection = engine.connect() + connection.execute(statement).fetchone() + except DatabaseError as soe: + logger.debug(f"Failed to fetch catalogs due to: {soe}") def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable): for catalog in connection.catalogs.list(): @@ -149,7 +162,11 @@ def test_connection( "GetSchemas": partial(get_schemas, connection, table_obj), "GetTables": partial(get_tables, connection, table_obj), "GetViews": partial(get_tables, connection, table_obj), - "GetQueries": client.test_query_api_access, + "GetQueries": partial( + test_database_query, + engine=engine, + statement=UNITY_CATALOG_SQL_STATEMENT_TEST, + ), "GetTags": partial(get_tags, service_connection, table_obj), } diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 2cd59e73c25..855057bfb4d 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -1,7 +1,19 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ -SQL queries for Unity Catalog +SQL Queries used during ingestion """ +import textwrap + UNITY_CATALOG_GET_CATALOGS_TAGS = """ SELECT * FROM `{database}`.information_schema.catalog_tags; """ @@ -17,3 +29,27 @@ SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{s UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """ SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; """ + +UNITY_CATALOG_SQL_STATEMENT = textwrap.dedent( + """ + SELECT + statement_type AS query_type, + statement_text AS query_text, + executed_by AS user_name, + start_time AS start_time, + null AS database_name, + null AS schema_name, + end_time AS end_time, + total_duration_ms/1000 AS duration + from system.query.history + WHERE statement_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND statement_text NOT LIKE '/* {{"app": "dbt", %%}} */%%' + AND start_time between to_timestamp('{start_time}') and to_timestamp('{end_time}') + {filters} + LIMIT {result_limit} + """ +) + +UNITY_CATALOG_SQL_STATEMENT_TEST = """ + SELECT statement_text from system.query.history LIMIT 1 +""" diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py index 5a4a721ddea..b005edb5e05 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py @@ -27,6 +27,9 @@ from metadata.ingestion.source.database.databricks.query_parser import ( ) from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient +from metadata.ingestion.source.database.unitycatalog.connection import ( + get_sqlalchemy_connection, +) from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -55,6 +58,7 @@ class UnityCatalogQueryParserSource( def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self._init_super(config=config, metadata=metadata) self.client = UnityCatalogClient(self.service_connection) + self.sql_client = get_sqlalchemy_connection(self.service_connection) @classmethod def create( diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py index 82092e33298..7951f4419cf 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py @@ -11,12 +11,10 @@ """ unity catalog usage module """ -import traceback -from datetime import datetime -from typing import Iterable -from metadata.generated.schema.type.basic import DateTime -from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery +from metadata.ingestion.source.database.unitycatalog.queries import ( + UNITY_CATALOG_SQL_STATEMENT, +) from metadata.ingestion.source.database.unitycatalog.query_parser import ( UnityCatalogQueryParserSource, ) @@ -35,36 +33,24 @@ class UnitycatalogUsageSource(UnityCatalogQueryParserSource, UsageSource): the same API for fetching Usage Queries """ - def yield_table_queries(self) -> Iterable[TableQuery]: - """ - Method to yield TableQueries - """ - queries = [] - data = self.client.list_query_history( - start_date=self.start, - end_date=self.end, - ) - for row in data or []: - try: - if self.client.is_query_valid(row): - queries.append( - TableQuery( - dialect=self.dialect.value, - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=str(row.get("query_start_time_ms")), - endTime=str(row.get("execution_end_time_ms")), - analysisDate=DateTime(datetime.now()), - serviceName=self.config.serviceName, - duration=row.get("duration") - if row.get("duration") - else None, - ) - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to process query {row.get('query_text')} due to: {err}" - ) + sql_stmt = UNITY_CATALOG_SQL_STATEMENT - yield TableQueries(queries=queries) + filters = """ + AND statement_type NOT IN ('SHOW', 'DESCRIBE', 'USE') + """ + + def get_engine(self): + yield self.sql_client + + def get_sql_statement(self, start_time, end_time): + """ + returns sql statement to fetch query logs. + + Override if we have specific parameters + """ + return self.sql_stmt.format( + start_time=start_time, + end_time=end_time, + filters=self.get_filters(), + result_limit=self.source_config.resultLimit, + )