mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-01 11:09:14 +00:00
Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
d9d30b2cba
commit
8ac83509a1
@ -63,6 +63,12 @@
|
||||
"description": "Snowflake warehouse.",
|
||||
"type": "string"
|
||||
},
|
||||
"queryTag": {
|
||||
"title": "Query Tag",
|
||||
"description": "Session query tag used to monitor usage on snoflake",
|
||||
"type": "string",
|
||||
"default": "OpenMetadata"
|
||||
},
|
||||
"privateKey": {
|
||||
"title": "Private Key",
|
||||
"description": "Connection to Snowflake instance via Private Key",
|
||||
|
||||
@ -48,6 +48,7 @@ from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.sql_queries import (
|
||||
FETCH_SNOWFLAKE_ALL_TAGS,
|
||||
FETCH_SNOWFLAKE_METADATA,
|
||||
SNOWFLAKE_SESSION_TAG_QUERY,
|
||||
)
|
||||
|
||||
GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
|
||||
@ -108,6 +109,7 @@ class SnowflakeSource(CommonDbSourceService):
|
||||
|
||||
def get_databases(self) -> Iterable[Inspector]:
|
||||
if self.config.serviceConnection.__root__.config.database:
|
||||
self.set_session_query_tag()
|
||||
yield from super().get_databases()
|
||||
else:
|
||||
query = "SHOW DATABASES"
|
||||
@ -124,6 +126,7 @@ class SnowflakeSource(CommonDbSourceService):
|
||||
logger.info(f"Ingesting from database: {row[1]}")
|
||||
self.config.serviceConnection.__root__.config.database = row[1]
|
||||
self.engine = get_connection(self.service_connection)
|
||||
self.set_session_query_tag()
|
||||
yield inspect(self.engine)
|
||||
|
||||
def add_tags_to_table(self, schema: str, table_name: str, table_entity):
|
||||
@ -193,6 +196,16 @@ class SnowflakeSource(CommonDbSourceService):
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def set_session_query_tag(self) -> None:
|
||||
"""
|
||||
Method to set query tag for current session
|
||||
"""
|
||||
self.engine.execute(
|
||||
SNOWFLAKE_SESSION_TAG_QUERY.format(
|
||||
query_tag=self.service_connection.queryTag
|
||||
)
|
||||
)
|
||||
|
||||
def next_record(self) -> Iterable[Entity]:
|
||||
yield from self.get_all_table_tags() or []
|
||||
for inspector in self.get_databases():
|
||||
|
||||
@ -142,6 +142,7 @@ SNOWFLAKE_SQL_STATEMENT = """
|
||||
end_time_range_end=>to_timestamp_ltz('{end_date}'),RESULT_LIMIT=>{result_limit}))
|
||||
WHERE QUERY_TYPE NOT IN ('ROLLBACK','CREATE_USER','CREATE_ROLE','CREATE_NETWORK_POLICY','ALTER_ROLE','ALTER_NETWORK_POLICY','ALTER_ACCOUNT','DROP_SEQUENCE','DROP_USER','DROP_ROLE','DROP_NETWORK_POLICY','REVOKE','UNLOAD','USE','DELETE','DROP','TRUNCATE_TABLE','ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES');
|
||||
"""
|
||||
SNOWFLAKE_SESSION_TAG_QUERY = 'ALTER SESSION SET QUERY_TAG="{query_tag}"'
|
||||
|
||||
NEO4J_AMUNDSEN_TABLE_QUERY = textwrap.dedent(
|
||||
"""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user