diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json index 73ef2646e87..4240c4f7cce 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json @@ -63,6 +63,12 @@ "description": "Snowflake warehouse.", "type": "string" }, + "queryTag": { + "title": "Query Tag", + "description": "Session query tag used to monitor usage on snoflake", + "type": "string", + "default": "OpenMetadata" + }, "privateKey": { "title": "Private Key", "description": "Connection to Snowflake instance via Private Key", diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake.py b/ingestion/src/metadata/ingestion/source/database/snowflake.py index 2ac517a6044..884bc0b6cec 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake.py @@ -48,6 +48,7 @@ from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import ( FETCH_SNOWFLAKE_ALL_TAGS, FETCH_SNOWFLAKE_METADATA, + SNOWFLAKE_SESSION_TAG_QUERY, ) GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY") @@ -108,6 +109,7 @@ class SnowflakeSource(CommonDbSourceService): def get_databases(self) -> Iterable[Inspector]: if self.config.serviceConnection.__root__.config.database: + self.set_session_query_tag() yield from super().get_databases() else: query = "SHOW DATABASES" @@ -124,6 +126,7 @@ class SnowflakeSource(CommonDbSourceService): logger.info(f"Ingesting from database: {row[1]}") self.config.serviceConnection.__root__.config.database = row[1] self.engine = get_connection(self.service_connection) + self.set_session_query_tag() yield inspect(self.engine) def add_tags_to_table(self, schema: str, table_name: str, table_entity): @@ -193,6 +196,16 @@ class SnowflakeSource(CommonDbSourceService): ) return cls(config, metadata_config) + def set_session_query_tag(self) -> None: + """ + Method to set query tag for current session + """ + self.engine.execute( + SNOWFLAKE_SESSION_TAG_QUERY.format( + query_tag=self.service_connection.queryTag + ) + ) + def next_record(self) -> Iterable[Entity]: yield from self.get_all_table_tags() or [] for inspector in self.get_databases(): diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 40abe358d36..dfd125526bb 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -142,6 +142,7 @@ SNOWFLAKE_SQL_STATEMENT = """ end_time_range_end=>to_timestamp_ltz('{end_date}'),RESULT_LIMIT=>{result_limit})) WHERE QUERY_TYPE NOT IN ('ROLLBACK','CREATE_USER','CREATE_ROLE','CREATE_NETWORK_POLICY','ALTER_ROLE','ALTER_NETWORK_POLICY','ALTER_ACCOUNT','DROP_SEQUENCE','DROP_USER','DROP_ROLE','DROP_NETWORK_POLICY','REVOKE','UNLOAD','USE','DELETE','DROP','TRUNCATE_TABLE','ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES'); """ +SNOWFLAKE_SESSION_TAG_QUERY = 'ALTER SESSION SET QUERY_TAG="{query_tag}"' NEO4J_AMUNDSEN_TABLE_QUERY = textwrap.dedent( """