Fix #5492: Fix Trino Comments (#8113)

This commit is contained in:
Mayur Singal 2022-10-14 10:09:31 +05:30 committed by GitHub
parent 1d3234ffe8
commit b55a46f4de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 29 deletions

View File

@ -12,11 +12,9 @@ source:
connectionArguments: {} connectionArguments: {}
sourceConfig: sourceConfig:
config: config:
enableDataProfiler: true
generateSampleData: false
tableFilterPattern: tableFilterPattern:
includes: includes:
- customer.* - customer.*
sink: sink:
type: metadata-rest type: metadata-rest
config: {} config: {}

View File

@ -14,7 +14,6 @@ Trino source implementation.
import logging import logging
import re import re
import sys import sys
from textwrap import dedent
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import click import click
@ -36,7 +35,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import TRINO_GET_COLUMNS
logger = ingestion_logger() logger = ingestion_logger()
ROW_DATA_TYPE = "row" ROW_DATA_TYPE = "row"
@ -100,19 +98,16 @@ def _get_columns(
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
# pylint: disable=protected-access # pylint: disable=protected-access
schema = schema or self._get_default_schema_name(connection) schema = schema or self._get_default_schema_name(connection)
query = dedent(TRINO_GET_COLUMNS).strip() query = f"SHOW COLUMNS FROM {schema}.{table_name}"
res = connection.execute(sql.text(query), schema=schema, table=table_name) res = connection.execute(sql.text(query), schema=schema, table=table_name)
columns = [] columns = []
for record in res: for record in res:
col_type = datatype.parse_sqltype(record.data_type) col_type = datatype.parse_sqltype(record.Type)
column = dict( column = dict(
name=record.column_name, name=record.Column, type=col_type, nullable=True, comment=record.Comment
type=col_type,
nullable=record.is_nullable == "YES",
default=record.column_default,
) )
type_str = record.data_type.strip().lower() type_str = record.Type.strip().lower()
type_name, type_opts = get_type_name_and_opts(type_str) type_name, type_opts = get_type_name_and_opts(type_str)
if type_opts and type_name == ROW_DATA_TYPE: if type_opts and type_name == ROW_DATA_TYPE:
column["raw_data_type"] = parse_row_data_type(type_str) column["raw_data_type"] = parse_row_data_type(type_str)

View File

@ -447,22 +447,6 @@ WHERE creation_time BETWEEN "{start_time}" AND "{end_time}"
""" """
) )
TRINO_GET_COLUMNS = textwrap.dedent(
"""
SELECT
"column_name",
"data_type",
"column_default",
UPPER("is_nullable") AS "is_nullable"
FROM "information_schema"."columns"
WHERE "table_schema" = :schema
AND "table_name" = :table
ORDER BY "ordinal_position" ASC
"""
)
POSTGRES_SQL_STATEMENT = textwrap.dedent( POSTGRES_SQL_STATEMENT = textwrap.dedent(
""" """
SELECT SELECT