diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py index bad1d766c5a..37594153b9f 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py @@ -14,7 +14,7 @@ Vertica source implementation. import re import traceback from textwrap import dedent -from typing import Iterable +from typing import Iterable, Optional from sqlalchemy import sql, util from sqlalchemy.engine import reflection @@ -38,6 +38,7 @@ from metadata.ingestion.source.database.vertica.queries import ( VERTICA_GET_COLUMNS, VERTICA_GET_PRIMARY_KEYS, VERTICA_LIST_DATABASES, + VERTICA_SCHEMA_COMMENTS, VERTICA_TABLE_COMMENTS, VERTICA_VIEW_DEFINITION, ) @@ -46,6 +47,7 @@ from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( get_all_table_comments, + get_schema_descriptions, get_table_comment_wrapper, ) @@ -272,6 +274,10 @@ class VerticaSource(CommonDbSourceService): Database metadata from Vertica Source """ + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.schema_desc_map = {} + @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -282,10 +288,22 @@ class VerticaSource(CommonDbSourceService): ) return cls(config, metadata_config) + def get_schema_description(self, schema_name: str) -> Optional[str]: + """ + Method to fetch the schema description + """ + return self.schema_desc_map.get(schema_name) + + def set_schema_description_map(self) -> None: + self.schema_desc_map = get_schema_descriptions( + self.engine, VERTICA_SCHEMA_COMMENTS + ) + def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database if configured_db: self.set_inspector(database_name=configured_db) + self.set_schema_description_map() yield configured_db else: results = self.connection.execute(VERTICA_LIST_DATABASES) @@ -310,6 +328,7 @@ class VerticaSource(CommonDbSourceService): try: self.set_inspector(database_name=new_database) + self.set_schema_description_map() yield new_database except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/queries.py b/ingestion/src/metadata/ingestion/source/database/vertica/queries.py index d0c7f40b06d..9b814beda9f 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/queries.py @@ -83,6 +83,17 @@ VERTICA_TABLE_COMMENTS = textwrap.dedent( """ ) +VERTICA_SCHEMA_COMMENTS = textwrap.dedent( + """ + SELECT + object_name as schema_name, + comment + FROM v_catalog.comments + WHERE object_type = 'SCHEMA'; + """ +) + + VERTICA_SQL_STATEMENT = textwrap.dedent( """ SELECT diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index 96f3d24eb5a..31d6956ebfa 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -15,7 +15,7 @@ Module for sqlalchmey dialect utils from typing import Dict, Tuple -from sqlalchemy.engine import reflection +from sqlalchemy.engine import Engine, reflection @reflection.cache @@ -58,3 +58,11 @@ def get_view_definition_wrapper(self, connection, query, table_name, schema=None ): self.get_all_view_definitions(connection, query) return self.all_view_definitions.get((table_name, schema), "") + + +def get_schema_descriptions(engine: Engine, query: str): + results = engine.execute(query).all() + schema_desc_map = {} + for row in results: + schema_desc_map[row.schema_name] = row.comment + return schema_desc_map