mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 11:39:12 +00:00
Fix: Databricks Schema Description (#21367)
(cherry picked from commit 7e3c7329191708fc023e7d3c6083ed8aa2e5a9de)
This commit is contained in:
parent
9791504a68
commit
dd05175c25
@ -50,6 +50,7 @@ from metadata.ingestion.source.database.databricks.queries import (
|
||||
DATABRICKS_GET_CATALOGS,
|
||||
DATABRICKS_GET_CATALOGS_TAGS,
|
||||
DATABRICKS_GET_COLUMN_TAGS,
|
||||
DATABRICKS_GET_SCHEMA_COMMENTS,
|
||||
DATABRICKS_GET_SCHEMA_TAGS,
|
||||
DATABRICKS_GET_TABLE_COMMENTS,
|
||||
DATABRICKS_GET_TABLE_TAGS,
|
||||
@ -65,6 +66,8 @@ from metadata.utils.filters import filter_by_database
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.sqlalchemy_utils import (
|
||||
get_all_view_definitions,
|
||||
get_schema_comment_result_wrapper,
|
||||
get_schema_comment_results,
|
||||
get_table_comment_result_wrapper,
|
||||
get_table_comment_results,
|
||||
get_view_definition_wrapper,
|
||||
@ -325,6 +328,24 @@ def get_table_comment_result(
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_schema_comment_result(
|
||||
self,
|
||||
connection,
|
||||
query,
|
||||
database,
|
||||
schema,
|
||||
**kw, # pylint: disable=unused-argument
|
||||
):
|
||||
return get_schema_comment_result_wrapper(
|
||||
self,
|
||||
connection,
|
||||
query=query,
|
||||
database=database,
|
||||
schema=schema,
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_table_ddl(
|
||||
self, connection, table_name, schema=None, **kw
|
||||
@ -413,6 +434,8 @@ DatabricksDialect.get_view_definition = get_view_definition
|
||||
DatabricksDialect.get_table_names = get_table_names
|
||||
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
|
||||
DatabricksDialect.get_table_comment_results = get_table_comment_results
|
||||
DatabricksDialect.get_schema_comment_results = get_schema_comment_results
|
||||
DatabricksDialect.get_schema_comment_result = get_schema_comment_result
|
||||
DatabricksDialect.get_table_comment_result = get_table_comment_result
|
||||
reflection.Inspector.get_schema_names = get_schema_names_reflection
|
||||
reflection.Inspector.get_table_ddl = get_table_ddl
|
||||
@ -756,6 +779,33 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
|
||||
)
|
||||
)
|
||||
|
||||
def get_schema_description(self, schema_name: str) -> str:
|
||||
description = None
|
||||
try:
|
||||
query = DATABRICKS_GET_SCHEMA_COMMENTS.format(
|
||||
database_name=self.context.get().database,
|
||||
schema_name=schema_name,
|
||||
)
|
||||
cursor = self.inspector.dialect.get_schema_comment_result(
|
||||
connection=self.connection,
|
||||
query=query,
|
||||
database=self.context.get().database,
|
||||
schema=schema_name,
|
||||
)
|
||||
for result in list(cursor):
|
||||
data = result.values()
|
||||
if data[0] and data[0].strip() == "Comment":
|
||||
description = data[1] if data and data[1] else None
|
||||
return description
|
||||
|
||||
# Catch any exception without breaking the ingestion
|
||||
except Exception as exep: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(
|
||||
f"Schema description error for schema [{schema_name}]: {exep}"
|
||||
)
|
||||
return description
|
||||
|
||||
def get_table_description(
|
||||
self, schema_name: str, table_name: str, inspector: Inspector
|
||||
) -> str:
|
||||
|
||||
@ -52,6 +52,10 @@ DATABRICKS_GET_TABLE_COMMENTS = (
|
||||
"DESCRIBE TABLE EXTENDED `{database_name}`.`{schema_name}`.`{table_name}`"
|
||||
)
|
||||
|
||||
DATABRICKS_GET_SCHEMA_COMMENTS = (
|
||||
"DESCRIBE SCHEMA EXTENDED `{database_name}`.`{schema_name}`"
|
||||
)
|
||||
|
||||
DATABRICKS_GET_CATALOGS = "SHOW CATALOGS"
|
||||
|
||||
DATABRICKS_GET_CATALOGS_TAGS = textwrap.dedent(
|
||||
|
||||
@ -174,6 +174,17 @@ def get_table_ddl(
|
||||
)
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_schema_comment_results(self, connection, query, database, schema=None):
|
||||
"""
|
||||
Method to fetch comment of all available schemas
|
||||
"""
|
||||
self.schema_comment_result: Dict[str, str] = {}
|
||||
self.current_db: str = database
|
||||
result = connection.execute(query).fetchall()
|
||||
self.schema_comment_result[schema] = result
|
||||
|
||||
|
||||
@reflection.cache
|
||||
def get_table_comment_results(
|
||||
self, connection, query, database, table_name, schema=None
|
||||
@ -197,3 +208,13 @@ def get_table_comment_result_wrapper(
|
||||
):
|
||||
self.get_table_comment_results(connection, query, database, table_name, schema)
|
||||
return self.table_comment_result.get((table_name, schema))
|
||||
|
||||
|
||||
def get_schema_comment_result_wrapper(self, connection, query, database, schema=None):
|
||||
if (
|
||||
not hasattr(self, "schema_comment_result")
|
||||
or self.schema_comment_result.get((schema)) is None
|
||||
or self.current_db != database
|
||||
):
|
||||
self.get_schema_comment_results(connection, query, database, schema)
|
||||
return self.schema_comment_result.get((schema))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user