Add Database & Schema Description: Snowflake (#10276)

This commit is contained in:
Mayur Singal 2023-02-23 15:40:48 +05:30 committed by GitHub
parent 50af4990e2
commit 9dbfea2fca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 0 deletions

View File

@ -139,6 +139,18 @@ class CommonDbSourceService(
self.inspector = inspect(self.engine)
yield database_name
def get_database_description(self, database_name: str) -> Optional[str]:
"""
Method to fetch the database description
by default there will be no database description
"""
def get_schema_description(self, schema_name: str) -> Optional[str]:
"""
Method to fetch the schema description
by default there will be no schema description
"""
def yield_database(self, database_name: str) -> Iterable[CreateDatabaseRequest]:
"""
From topology.
@ -148,6 +160,7 @@ class CommonDbSourceService(
yield CreateDatabaseRequest(
name=database_name,
service=self.context.database_service.fullyQualifiedName,
description=self.get_database_description(database_name),
)
def get_raw_database_schema_names(self) -> Iterable[str]:
@ -174,6 +187,7 @@ class CommonDbSourceService(
yield CreateDatabaseSchemaRequest(
name=schema_name,
database=self.context.database.fullyQualifiedName,
description=self.get_schema_description(schema_name),
)
@staticmethod

View File

@ -45,6 +45,8 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_FETCH_ALL_TAGS,
SNOWFLAKE_GET_CLUSTER_KEY,
SNOWFLAKE_GET_COMMENTS,
SNOWFLAKE_GET_DATABASE_COMMENTS,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_TABLE_NAMES,
SNOWFLAKE_GET_VIEW_NAMES,
SNOWFLAKE_SESSION_TAG_QUERY,
@ -148,6 +150,8 @@ class SnowflakeSource(CommonDbSourceService):
def __init__(self, config, metadata_config):
self.partition_details = {}
super().__init__(config, metadata_config)
self.schema_desc_map = {}
self.database_desc_map = {}
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -179,12 +183,39 @@ class SnowflakeSource(CommonDbSourceService):
f"{row.TABLE_SCHEMA}.{row.TABLE_NAME}"
] = row.CLUSTERING_KEY
def set_schema_description_map(self) -> None:
results = self.engine.execute(SNOWFLAKE_GET_SCHEMA_COMMENTS).all()
for row in results:
self.schema_desc_map[(row.DATABASE_NAME, row.SCHEMA_NAME)] = row.COMMENT
def set_database_description_map(self) -> None:
if not self.database_desc_map:
results = self.engine.execute(SNOWFLAKE_GET_DATABASE_COMMENTS).all()
for row in results:
self.database_desc_map[row.DATABASE_NAME] = row.COMMENT
def get_schema_description(self, schema_name: str) -> Optional[str]:
"""
Method to fetch the schema description
"""
return self.schema_desc_map.get(
(self.context.database.name.__root__, schema_name)
)
def get_database_description(self, database_name: str) -> Optional[str]:
"""
Method to fetch the database description
"""
return self.database_desc_map.get(database_name)
def get_database_names(self) -> Iterable[str]:
configured_db = self.config.serviceConnection.__root__.config.database
if configured_db:
self.set_inspector(configured_db)
self.set_session_query_tag()
self.set_partition_details()
self.set_schema_description_map()
self.set_database_description_map()
yield configured_db
else:
results = self.connection.execute("SHOW DATABASES")
@ -211,6 +242,8 @@ class SnowflakeSource(CommonDbSourceService):
self.set_inspector(database_name=new_database)
self.set_session_query_tag()
self.set_partition_details()
self.set_schema_description_map()
self.set_database_description_map()
yield new_database
except Exception as exc:
logger.debug(traceback.format_exc())

View File

@ -73,3 +73,17 @@ SNOWFLAKE_GET_CLUSTER_KEY = """
where TABLE_TYPE = 'BASE TABLE'
and CLUSTERING_KEY is not null
"""
SNOWFLAKE_GET_SCHEMA_COMMENTS = """
SELECT
catalog_name DATABASE_NAME,
SCHEMA_NAME,
COMMENT
FROM information_schema.schemata
"""
SNOWFLAKE_GET_DATABASE_COMMENTS = """
select DATABASE_NAME,COMMENT from information_schema.databases
"""