Trino Optimize table comments (#10367)

This commit is contained in:
Mayur Singal 2023-03-01 10:39:28 +05:30 committed by GitHub
parent c0f06191b0
commit 55a28ffd40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 4 deletions

View File

@ -18,10 +18,10 @@ import traceback
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Tuple
from sqlalchemy import inspect, sql, util
from sqlalchemy import exc, inspect, sql, util
from sqlalchemy.engine.base import Connection
from sqlalchemy.sql import sqltypes
from trino.sqlalchemy import datatype
from trino.sqlalchemy import datatype, error
from trino.sqlalchemy.dialect import TrinoDialect
from metadata.generated.schema.entity.data.database import Database
@ -37,9 +37,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.trino.queries import TRINO_TABLE_COMMENTS
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ANSI, ingestion_logger, log_ansi_encoded_string
from metadata.utils.sqlalchemy_utils import get_all_table_comments
logger = ingestion_logger()
ROW_DATA_TYPE = "row"
@ -125,7 +127,49 @@ def _get_columns(
return columns
def get_table_comment( # pylint: disable=unused-argument
self, connection: Connection, table_name: str, schema: str = None, **kw
) -> Dict[str, Any]:
"""
Override get table comment method to batch process comments
"""
catalog_name = self._get_default_catalog_name( # pylint: disable=protected-access
connection
)
if catalog_name is None:
raise exc.NoSuchTableError("catalog is required in connection")
schema_name = (
self._get_default_schema_name(connection) # pylint: disable=protected-access
or schema
)
if schema_name is None:
raise exc.NoSuchTableError("schema is required")
self.processed_schema = (
self.processed_schema if hasattr(self, "processed_schema") else set()
)
try:
if (
not hasattr(self, "all_table_comments")
or self.current_db != connection.engine.url.database
or schema not in self.processed_schema
):
self.processed_schema.add(schema)
self.get_all_table_comments(
connection,
TRINO_TABLE_COMMENTS.format(
catalog_name=catalog_name, schema_name=schema
),
)
return {"text": self.all_table_comments.get((table_name, schema))}
except error.TrinoQueryError as exe:
if exe.error_name in (error.PERMISSION_DENIED,):
return {"text": None}
raise
TrinoDialect._get_columns = _get_columns # pylint: disable=protected-access
TrinoDialect.get_all_table_comments = get_all_table_comments
TrinoDialect.get_table_comment = get_table_comment
class TrinoSource(CommonDbSourceService):
@ -204,8 +248,8 @@ class TrinoSource(CommonDbSourceService):
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to connect to database {new_catalog}: {exc}"
f"Error trying to connect to database {new_catalog}: {err}"
)

View File

@ -0,0 +1,27 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SQL Queries used during ingestion
"""
import textwrap
TRINO_TABLE_COMMENTS = textwrap.dedent(
"""
SELECT "comment" table_comment,
"schema_name" schema,
"table_name"
FROM "system"."metadata"."table_comments"
WHERE "catalog_name" = '{catalog_name}'
and "schema_name" = '{schema_name}'
and "comment" is not null
"""
)