mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-08 06:53:11 +00:00
Fix incomplete trino view definition extraction (#23349)
(cherry picked from commit f9e866cd50a9fcd7fee45180956f91b1313bd98e)
This commit is contained in:
parent
a49f1788b4
commit
b618871b87
@ -35,7 +35,10 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|||||||
from metadata.ingestion.source.connections import get_connection
|
from metadata.ingestion.source.connections import get_connection
|
||||||
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
|
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
|
||||||
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
|
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
|
||||||
from metadata.ingestion.source.database.trino.queries import TRINO_TABLE_COMMENTS
|
from metadata.ingestion.source.database.trino.queries import (
|
||||||
|
TRINO_TABLE_COMMENTS,
|
||||||
|
TRINO_VIEW_DEFINITION,
|
||||||
|
)
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.filters import filter_by_database
|
from metadata.utils.filters import filter_by_database
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
@ -168,9 +171,43 @@ def get_table_comment( # pylint: disable=unused-argument
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def get_view_definition(
|
||||||
|
self, connection: Connection, view_name: str, schema: str = None, **kw
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Get the view definition using SHOW CREATE VIEW for Trino
|
||||||
|
|
||||||
|
Default implementation from sqlalchemy uses view_definition column value
|
||||||
|
from "information_schema"."views" view, which does not return the full definition
|
||||||
|
of view. Hence, we should use SHOW CREATE VIEW statement to get full view definition.
|
||||||
|
"""
|
||||||
|
catalog_name = self._get_default_catalog_name( # pylint: disable=protected-access
|
||||||
|
connection
|
||||||
|
)
|
||||||
|
schema = schema or self._get_default_schema_name(
|
||||||
|
connection
|
||||||
|
) # pylint: disable=protected-access
|
||||||
|
if schema is None:
|
||||||
|
raise exc.NoSuchTableError("schema is required")
|
||||||
|
|
||||||
|
if catalog_name:
|
||||||
|
full_view_name = f"{catalog_name}.{schema}.{view_name}"
|
||||||
|
else:
|
||||||
|
full_view_name = f"{schema}.{view_name}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use SHOW CREATE VIEW to get the full DDL
|
||||||
|
query = TRINO_VIEW_DEFINITION.format(view_name=full_view_name)
|
||||||
|
res = connection.execute(sql.text(query))
|
||||||
|
return res.scalar()
|
||||||
|
except Exception:
|
||||||
|
logger.warning(f"Could not get view definition for view [{full_view_name}]")
|
||||||
|
|
||||||
|
|
||||||
TrinoDialect._get_columns = _get_columns # pylint: disable=protected-access
|
TrinoDialect._get_columns = _get_columns # pylint: disable=protected-access
|
||||||
TrinoDialect.get_all_table_comments = get_all_table_comments
|
TrinoDialect.get_all_table_comments = get_all_table_comments
|
||||||
TrinoDialect.get_table_comment = get_table_comment
|
TrinoDialect.get_table_comment = get_table_comment
|
||||||
|
TrinoDialect.get_view_definition = get_view_definition
|
||||||
|
|
||||||
|
|
||||||
class TrinoSource(CommonDbSourceService):
|
class TrinoSource(CommonDbSourceService):
|
||||||
@ -226,9 +263,11 @@ class TrinoSource(CommonDbSourceService):
|
|||||||
)
|
)
|
||||||
if filter_by_database(
|
if filter_by_database(
|
||||||
self.source_config.databaseFilterPattern,
|
self.source_config.databaseFilterPattern,
|
||||||
|
(
|
||||||
database_fqn
|
database_fqn
|
||||||
if self.source_config.useFqnForFiltering
|
if self.source_config.useFqnForFiltering
|
||||||
else new_catalog,
|
else new_catalog
|
||||||
|
),
|
||||||
):
|
):
|
||||||
self.status.filter(database_fqn, "Database Filtered Out")
|
self.status.filter(database_fqn, "Database Filtered Out")
|
||||||
continue
|
continue
|
||||||
|
|||||||
@ -46,3 +46,7 @@ TRINO_TABLE_COMMENTS = textwrap.dedent(
|
|||||||
TRINO_GET_DATABASE = """
|
TRINO_GET_DATABASE = """
|
||||||
SHOW CATALOGS
|
SHOW CATALOGS
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
TRINO_VIEW_DEFINITION = """
|
||||||
|
SHOW CREATE VIEW {view_name}
|
||||||
|
"""
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user