From dd05175c2569ded217f72e4afcb4c569ccdb0120 Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Mon, 2 Jun 2025 11:34:07 +0530 Subject: [PATCH] Fix: Databricks Schema Description (#21367) (cherry picked from commit 7e3c7329191708fc023e7d3c6083ed8aa2e5a9de) --- .../source/database/databricks/metadata.py | 50 +++++++++++++++++++ .../source/database/databricks/queries.py | 4 ++ .../src/metadata/utils/sqlalchemy_utils.py | 21 ++++++++ 3 files changed, 75 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py index 479f2ef521e..aa92be906ad 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py @@ -50,6 +50,7 @@ from metadata.ingestion.source.database.databricks.queries import ( DATABRICKS_GET_CATALOGS, DATABRICKS_GET_CATALOGS_TAGS, DATABRICKS_GET_COLUMN_TAGS, + DATABRICKS_GET_SCHEMA_COMMENTS, DATABRICKS_GET_SCHEMA_TAGS, DATABRICKS_GET_TABLE_COMMENTS, DATABRICKS_GET_TABLE_TAGS, @@ -65,6 +66,8 @@ from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( get_all_view_definitions, + get_schema_comment_result_wrapper, + get_schema_comment_results, get_table_comment_result_wrapper, get_table_comment_results, get_view_definition_wrapper, @@ -325,6 +328,24 @@ def get_table_comment_result( ) +@reflection.cache +def get_schema_comment_result( + self, + connection, + query, + database, + schema, + **kw, # pylint: disable=unused-argument +): + return get_schema_comment_result_wrapper( + self, + connection, + query=query, + database=database, + schema=schema, + ) + + @reflection.cache def get_table_ddl( self, connection, table_name, schema=None, **kw @@ -413,6 +434,8 @@ DatabricksDialect.get_view_definition = get_view_definition DatabricksDialect.get_table_names = get_table_names DatabricksDialect.get_all_view_definitions = get_all_view_definitions DatabricksDialect.get_table_comment_results = get_table_comment_results +DatabricksDialect.get_schema_comment_results = get_schema_comment_results +DatabricksDialect.get_schema_comment_result = get_schema_comment_result DatabricksDialect.get_table_comment_result = get_table_comment_result reflection.Inspector.get_schema_names = get_schema_names_reflection reflection.Inspector.get_table_ddl = get_table_ddl @@ -756,6 +779,33 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB ) ) + def get_schema_description(self, schema_name: str) -> str: + description = None + try: + query = DATABRICKS_GET_SCHEMA_COMMENTS.format( + database_name=self.context.get().database, + schema_name=schema_name, + ) + cursor = self.inspector.dialect.get_schema_comment_result( + connection=self.connection, + query=query, + database=self.context.get().database, + schema=schema_name, + ) + for result in list(cursor): + data = result.values() + if data[0] and data[0].strip() == "Comment": + description = data[1] if data and data[1] else None + return description + + # Catch any exception without breaking the ingestion + except Exception as exep: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning( + f"Schema description error for schema [{schema_name}]: {exep}" + ) + return description + def get_table_description( self, schema_name: str, table_name: str, inspector: Inspector ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py index 1db98fff713..c45afd47e2d 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -52,6 +52,10 @@ DATABRICKS_GET_TABLE_COMMENTS = ( "DESCRIBE TABLE EXTENDED `{database_name}`.`{schema_name}`.`{table_name}`" ) +DATABRICKS_GET_SCHEMA_COMMENTS = ( + "DESCRIBE SCHEMA EXTENDED `{database_name}`.`{schema_name}`" +) + DATABRICKS_GET_CATALOGS = "SHOW CATALOGS" DATABRICKS_GET_CATALOGS_TAGS = textwrap.dedent( diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index fd43feffad6..444217cd1e1 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -174,6 +174,17 @@ def get_table_ddl( ) +@reflection.cache +def get_schema_comment_results(self, connection, query, database, schema=None): + """ + Method to fetch comment of all available schemas + """ + self.schema_comment_result: Dict[str, str] = {} + self.current_db: str = database + result = connection.execute(query).fetchall() + self.schema_comment_result[schema] = result + + @reflection.cache def get_table_comment_results( self, connection, query, database, table_name, schema=None @@ -197,3 +208,13 @@ def get_table_comment_result_wrapper( ): self.get_table_comment_results(connection, query, database, table_name, schema) return self.table_comment_result.get((table_name, schema)) + + +def get_schema_comment_result_wrapper(self, connection, query, database, schema=None): + if ( + not hasattr(self, "schema_comment_result") + or self.schema_comment_result.get((schema)) is None + or self.current_db != database + ): + self.get_schema_comment_results(connection, query, database, schema) + return self.schema_comment_result.get((schema))