diff --git a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py index 593ddc4b838..0d740d60d1b 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py @@ -18,10 +18,10 @@ import traceback from copy import deepcopy from typing import Any, Dict, Iterable, List, Optional, Tuple -from sqlalchemy import inspect, sql, util +from sqlalchemy import exc, inspect, sql, util from sqlalchemy.engine.base import Connection from sqlalchemy.sql import sqltypes -from trino.sqlalchemy import datatype +from trino.sqlalchemy import datatype, error from trino.sqlalchemy.dialect import TrinoDialect from metadata.generated.schema.entity.data.database import Database @@ -37,9 +37,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.trino.queries import TRINO_TABLE_COMMENTS from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ANSI, ingestion_logger, log_ansi_encoded_string +from metadata.utils.sqlalchemy_utils import get_all_table_comments logger = ingestion_logger() ROW_DATA_TYPE = "row" @@ -125,7 +127,49 @@ def _get_columns( return columns +def get_table_comment( # pylint: disable=unused-argument + self, connection: Connection, table_name: str, schema: str = None, **kw +) -> Dict[str, Any]: + """ + Override get table comment method to batch process comments + """ + catalog_name = self._get_default_catalog_name( # pylint: disable=protected-access + connection + ) + if catalog_name is None: + raise exc.NoSuchTableError("catalog is required in connection") + schema_name = ( + self._get_default_schema_name(connection) # pylint: disable=protected-access + or schema + ) + if schema_name is None: + raise exc.NoSuchTableError("schema is required") + self.processed_schema = ( + self.processed_schema if hasattr(self, "processed_schema") else set() + ) + try: + if ( + not hasattr(self, "all_table_comments") + or self.current_db != connection.engine.url.database + or schema not in self.processed_schema + ): + self.processed_schema.add(schema) + self.get_all_table_comments( + connection, + TRINO_TABLE_COMMENTS.format( + catalog_name=catalog_name, schema_name=schema + ), + ) + return {"text": self.all_table_comments.get((table_name, schema))} + except error.TrinoQueryError as exe: + if exe.error_name in (error.PERMISSION_DENIED,): + return {"text": None} + raise + + TrinoDialect._get_columns = _get_columns # pylint: disable=protected-access +TrinoDialect.get_all_table_comments = get_all_table_comments +TrinoDialect.get_table_comment = get_table_comment class TrinoSource(CommonDbSourceService): @@ -204,8 +248,8 @@ class TrinoSource(CommonDbSourceService): try: self.set_inspector(database_name=new_catalog) yield new_catalog - except Exception as exc: + except Exception as err: logger.debug(traceback.format_exc()) logger.warning( - f"Error trying to connect to database {new_catalog}: {exc}" + f"Error trying to connect to database {new_catalog}: {err}" ) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/queries.py b/ingestion/src/metadata/ingestion/source/database/trino/queries.py new file mode 100644 index 00000000000..104dfcb62c1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/trino/queries.py @@ -0,0 +1,27 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during ingestion +""" + +import textwrap + +TRINO_TABLE_COMMENTS = textwrap.dedent( + """ + SELECT "comment" table_comment, + "schema_name" schema, + "table_name" + FROM "system"."metadata"."table_comments" + WHERE "catalog_name" = '{catalog_name}' + and "schema_name" = '{schema_name}' + and "comment" is not null + """ +)