Fix #10263: Fetch Vertica Schema Comments (#10363)

This commit is contained in:
Mayur Singal 2023-03-01 11:35:01 +05:30 committed by GitHub
parent a175198c7d
commit f6ba024b5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 2 deletions

View File

@ -14,7 +14,7 @@ Vertica source implementation.
import re import re
import traceback import traceback
from textwrap import dedent from textwrap import dedent
from typing import Iterable from typing import Iterable, Optional
from sqlalchemy import sql, util from sqlalchemy import sql, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
@ -38,6 +38,7 @@ from metadata.ingestion.source.database.vertica.queries import (
VERTICA_GET_COLUMNS, VERTICA_GET_COLUMNS,
VERTICA_GET_PRIMARY_KEYS, VERTICA_GET_PRIMARY_KEYS,
VERTICA_LIST_DATABASES, VERTICA_LIST_DATABASES,
VERTICA_SCHEMA_COMMENTS,
VERTICA_TABLE_COMMENTS, VERTICA_TABLE_COMMENTS,
VERTICA_VIEW_DEFINITION, VERTICA_VIEW_DEFINITION,
) )
@ -46,6 +47,7 @@ from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import ( from metadata.utils.sqlalchemy_utils import (
get_all_table_comments, get_all_table_comments,
get_schema_descriptions,
get_table_comment_wrapper, get_table_comment_wrapper,
) )
@ -272,6 +274,10 @@ class VerticaSource(CommonDbSourceService):
Database metadata from Vertica Source Database metadata from Vertica Source
""" """
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.schema_desc_map = {}
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -282,10 +288,22 @@ class VerticaSource(CommonDbSourceService):
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def get_schema_description(self, schema_name: str) -> Optional[str]:
"""
Method to fetch the schema description
"""
return self.schema_desc_map.get(schema_name)
def set_schema_description_map(self) -> None:
self.schema_desc_map = get_schema_descriptions(
self.engine, VERTICA_SCHEMA_COMMENTS
)
def get_database_names(self) -> Iterable[str]: def get_database_names(self) -> Iterable[str]:
configured_db = self.config.serviceConnection.__root__.config.database configured_db = self.config.serviceConnection.__root__.config.database
if configured_db: if configured_db:
self.set_inspector(database_name=configured_db) self.set_inspector(database_name=configured_db)
self.set_schema_description_map()
yield configured_db yield configured_db
else: else:
results = self.connection.execute(VERTICA_LIST_DATABASES) results = self.connection.execute(VERTICA_LIST_DATABASES)
@ -310,6 +328,7 @@ class VerticaSource(CommonDbSourceService):
try: try:
self.set_inspector(database_name=new_database) self.set_inspector(database_name=new_database)
self.set_schema_description_map()
yield new_database yield new_database
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -83,6 +83,17 @@ VERTICA_TABLE_COMMENTS = textwrap.dedent(
""" """
) )
VERTICA_SCHEMA_COMMENTS = textwrap.dedent(
"""
SELECT
object_name as schema_name,
comment
FROM v_catalog.comments
WHERE object_type = 'SCHEMA';
"""
)
VERTICA_SQL_STATEMENT = textwrap.dedent( VERTICA_SQL_STATEMENT = textwrap.dedent(
""" """
SELECT SELECT

View File

@ -15,7 +15,7 @@ Module for sqlalchmey dialect utils
from typing import Dict, Tuple from typing import Dict, Tuple
from sqlalchemy.engine import reflection from sqlalchemy.engine import Engine, reflection
@reflection.cache @reflection.cache
@ -58,3 +58,11 @@ def get_view_definition_wrapper(self, connection, query, table_name, schema=None
): ):
self.get_all_view_definitions(connection, query) self.get_all_view_definitions(connection, query)
return self.all_view_definitions.get((table_name, schema), "") return self.all_view_definitions.get((table_name, schema), "")
def get_schema_descriptions(engine: Engine, query: str):
results = engine.execute(query).all()
schema_desc_map = {}
for row in results:
schema_desc_map[row.schema_name] = row.comment
return schema_desc_map