diff --git a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py index f2da86fbd38..151ef775ae1 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py @@ -35,7 +35,10 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.common_db_source import CommonDbSourceService -from metadata.ingestion.source.database.trino.queries import TRINO_TABLE_COMMENTS +from metadata.ingestion.source.database.trino.queries import ( + TRINO_TABLE_COMMENTS, + TRINO_VIEW_DEFINITION, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -168,9 +171,43 @@ def get_table_comment( # pylint: disable=unused-argument raise +def get_view_definition( + self, connection: Connection, view_name: str, schema: str = None, **kw +) -> Optional[str]: + """ + Get the view definition using SHOW CREATE VIEW for Trino + + Default implementation from sqlalchemy uses view_definition column value + from "information_schema"."views" view, which does not return the full definition + of view. Hence, we should use SHOW CREATE VIEW statement to get full view definition. + """ + catalog_name = self._get_default_catalog_name( # pylint: disable=protected-access + connection + ) + schema = schema or self._get_default_schema_name( + connection + ) # pylint: disable=protected-access + if schema is None: + raise exc.NoSuchTableError("schema is required") + + if catalog_name: + full_view_name = f"{catalog_name}.{schema}.{view_name}" + else: + full_view_name = f"{schema}.{view_name}" + + try: + # Use SHOW CREATE VIEW to get the full DDL + query = TRINO_VIEW_DEFINITION.format(view_name=full_view_name) + res = connection.execute(sql.text(query)) + return res.scalar() + except Exception: + logger.warning(f"Could not get view definition for view [{full_view_name}]") + + TrinoDialect._get_columns = _get_columns # pylint: disable=protected-access TrinoDialect.get_all_table_comments = get_all_table_comments TrinoDialect.get_table_comment = get_table_comment +TrinoDialect.get_view_definition = get_view_definition class TrinoSource(CommonDbSourceService): @@ -226,9 +263,11 @@ class TrinoSource(CommonDbSourceService): ) if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn - if self.source_config.useFqnForFiltering - else new_catalog, + ( + database_fqn + if self.source_config.useFqnForFiltering + else new_catalog + ), ): self.status.filter(database_fqn, "Database Filtered Out") continue diff --git a/ingestion/src/metadata/ingestion/source/database/trino/queries.py b/ingestion/src/metadata/ingestion/source/database/trino/queries.py index 1bb9db5e919..5114a571986 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/queries.py @@ -46,3 +46,7 @@ TRINO_TABLE_COMMENTS = textwrap.dedent( TRINO_GET_DATABASE = """ SHOW CATALOGS """ + +TRINO_VIEW_DEFINITION = """ +SHOW CREATE VIEW {view_name} +"""