Fix #19635: Snowflake allow custom account_usage schema (#19678)

This commit is contained in:
Mayur Singal 2025-02-11 14:58:44 +05:30 committed by GitHub
parent fba92df04c
commit 29f067c162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 63 additions and 12 deletions

View File

@ -193,10 +193,18 @@ def test_connection(
engine_wrapper=engine_wrapper, engine_wrapper=engine_wrapper,
), ),
"GetQueries": partial( "GetQueries": partial(
test_query, statement=SNOWFLAKE_TEST_GET_QUERIES, engine=engine test_query,
statement=SNOWFLAKE_TEST_GET_QUERIES.format(
account_usage=service_connection.accountUsageSchema
),
engine=engine,
), ),
"GetTags": partial( "GetTags": partial(
test_query, statement=SNOWFLAKE_TEST_FETCH_TAG, engine=engine test_query,
statement=SNOWFLAKE_TEST_FETCH_TAG.format(
account_usage=service_connection.accountUsageSchema
),
engine=engine,
), ),
} }

View File

@ -62,6 +62,7 @@ class SnowflakeLineageSource(
start, _ = get_start_and_end(self.source_config.queryLogDuration) start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = self.stored_procedure_query.format( query = self.stored_procedure_query.format(
start_date=start, start_date=start,
account_usage=self.service_connection.accountUsageSchema,
) )
queries_dict = self.procedure_queries_dict( queries_dict = self.procedure_queries_dict(
query=query, query=query,

View File

@ -418,6 +418,7 @@ class SnowflakeSource(
SNOWFLAKE_FETCH_ALL_TAGS.format( SNOWFLAKE_FETCH_ALL_TAGS.format(
database_name=self.context.get().database, database_name=self.context.get().database,
schema_name=schema_name, schema_name=schema_name,
account_usage=self.service_connection.accountUsageSchema,
) )
) )
@ -431,6 +432,7 @@ class SnowflakeSource(
SNOWFLAKE_FETCH_ALL_TAGS.format( SNOWFLAKE_FETCH_ALL_TAGS.format(
database_name=f'"{self.context.get().database}"', database_name=f'"{self.context.get().database}"',
schema_name=f'"{self.context.get().database_schema}"', schema_name=f'"{self.context.get().database_schema}"',
account_usage=self.service_connection.accountUsageSchema,
) )
) )
except Exception as inner_exc: except Exception as inner_exc:
@ -635,6 +637,7 @@ class SnowflakeSource(
query.format( query.format(
database_name=self.context.get().database, database_name=self.context.get().database,
schema_name=self.context.get().database_schema, schema_name=self.context.get().database_schema,
account_usage=self.service_connection.accountUsageSchema,
) )
).all() ).all()
for row in results: for row in results:

View File

@ -25,7 +25,7 @@ SNOWFLAKE_SQL_STATEMENT = textwrap.dedent(
start_time "start_time", start_time "start_time",
end_time "end_time", end_time "end_time",
total_elapsed_time "duration" total_elapsed_time "duration"
from snowflake.account_usage.query_history from {account_usage}.query_history
WHERE query_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' WHERE query_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND query_text NOT LIKE '/* {{"app": "dbt", %%}} */%%' AND query_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND start_time between to_timestamp_ltz('{start_time}') and to_timestamp_ltz('{end_time}') AND start_time between to_timestamp_ltz('{start_time}') and to_timestamp_ltz('{end_time}')
@ -39,7 +39,7 @@ SNOWFLAKE_SESSION_TAG_QUERY = 'ALTER SESSION SET QUERY_TAG="{query_tag}"'
SNOWFLAKE_FETCH_ALL_TAGS = textwrap.dedent( SNOWFLAKE_FETCH_ALL_TAGS = textwrap.dedent(
""" """
select TAG_NAME, TAG_VALUE, OBJECT_DATABASE, OBJECT_SCHEMA, OBJECT_NAME, COLUMN_NAME select TAG_NAME, TAG_VALUE, OBJECT_DATABASE, OBJECT_SCHEMA, OBJECT_NAME, COLUMN_NAME
from snowflake.account_usage.tag_references from {account_usage}.tag_references
where OBJECT_DATABASE = '{database_name}' where OBJECT_DATABASE = '{database_name}'
and OBJECT_SCHEMA = '{schema_name}' and OBJECT_SCHEMA = '{schema_name}'
""" """
@ -234,11 +234,11 @@ SHOW EXTERNAL TABLES IN DATABASE "{database_name}"
""" """
SNOWFLAKE_TEST_FETCH_TAG = """ SNOWFLAKE_TEST_FETCH_TAG = """
select TAG_NAME from snowflake.account_usage.tag_references limit 1 select TAG_NAME from {account_usage}.tag_references limit 1
""" """
SNOWFLAKE_TEST_GET_QUERIES = """ SNOWFLAKE_TEST_GET_QUERIES = """
SELECT query_text from snowflake.account_usage.query_history limit 1 SELECT query_text from {account_usage}.query_history limit 1
""" """
SNOWFLAKE_TEST_GET_TABLES = """ SNOWFLAKE_TEST_GET_TABLES = """
@ -296,10 +296,10 @@ SELECT
ARGUMENT_SIGNATURE AS signature, ARGUMENT_SIGNATURE AS signature,
COMMENT as comment, COMMENT as comment,
'StoredProcedure' as procedure_type 'StoredProcedure' as procedure_type
FROM SNOWFLAKE.ACCOUNT_USAGE.PROCEDURES FROM {account_usage}.PROCEDURES
WHERE PROCEDURE_CATALOG = '{database_name}' WHERE PROCEDURE_CATALOG = '{database_name}'
AND PROCEDURE_SCHEMA = '{schema_name}' AND PROCEDURE_SCHEMA = '{schema_name}'
AND DELETED IS NOT NULL AND DELETED IS NULL
""" """
) )
@ -313,10 +313,10 @@ SELECT
ARGUMENT_SIGNATURE AS signature, ARGUMENT_SIGNATURE AS signature,
COMMENT as comment, COMMENT as comment,
'UDF' as procedure_type 'UDF' as procedure_type
FROM SNOWFLAKE.ACCOUNT_USAGE.FUNCTIONS FROM {account_usage}.FUNCTIONS
WHERE FUNCTION_CATALOG = '{database_name}' WHERE FUNCTION_CATALOG = '{database_name}'
AND FUNCTION_SCHEMA = '{schema_name}' AND FUNCTION_SCHEMA = '{schema_name}'
AND DELETED IS NOT NULL AND DELETED IS NULL
""" """
) )
@ -336,7 +336,7 @@ WITH SP_HISTORY AS (
SESSION_ID, SESSION_ID,
START_TIME, START_TIME,
END_TIME END_TIME
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP FROM {account_usage}.QUERY_HISTORY SP
WHERE QUERY_TYPE = 'CALL' WHERE QUERY_TYPE = 'CALL'
AND START_TIME >= '{start_date}' AND START_TIME >= '{start_date}'
AND QUERY_TEXT <> '' AND QUERY_TEXT <> ''
@ -353,7 +353,7 @@ Q_HISTORY AS (
USER_NAME, USER_NAME,
SCHEMA_NAME, SCHEMA_NAME,
DATABASE_NAME DATABASE_NAME
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP FROM {account_usage}.QUERY_HISTORY SP
WHERE QUERY_TYPE <> 'CALL' WHERE QUERY_TYPE <> 'CALL'
AND QUERY_TEXT NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' AND QUERY_TEXT NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND QUERY_TEXT NOT LIKE '/* {{"app": "dbt", %%}} */%%' AND QUERY_TEXT NOT LIKE '/* {{"app": "dbt", %%}} */%%'

View File

@ -60,6 +60,7 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC):
end_time=end_time, end_time=end_time,
result_limit=self.config.sourceConfig.config.resultLimit, result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.get_filters(), filters=self.get_filters(),
account_usage=self.service_connection.accountUsageSchema,
) )
def check_life_cycle_query( def check_life_cycle_query(

View File

@ -114,6 +114,9 @@ You can find more information about the `account_usage` schema [here](https://do
- **Include Temporary and Transient Tables**: - **Include Temporary and Transient Tables**:
Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables. Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables.
- **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration. - **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
- **Account Usage Schema Name**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field.
When using this field make sure you have all these tables available within your custom schema `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS`.
{% partial file="/v1.6/connectors/database/advanced-configuration.md" /%} {% partial file="/v1.6/connectors/database/advanced-configuration.md" /%}

View File

@ -150,6 +150,14 @@ This is a sample config for Snowflake:
{% /codeInfo %} {% /codeInfo %}
{% codeInfo srNumber=40 %}
**accountUsageSchema**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field.
When using this field make sure you have all these tables available within your custom schema `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS`.
{% /codeInfo %}
{% codeInfo srNumber=6 %} {% codeInfo srNumber=6 %}
**includeTransientTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables. **includeTransientTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.
@ -231,6 +239,9 @@ source:
```yaml {% srNumber=5 %} ```yaml {% srNumber=5 %}
# database: <database> # database: <database>
``` ```
```yaml {% srNumber=40 %}
# accountUsageSchema: SNOWFLAKE.ACCOUNT_USAGE
```
```yaml {% srNumber=6 %} ```yaml {% srNumber=6 %}
includeTransientTables: false includeTransientTables: false
``` ```

View File

@ -115,6 +115,9 @@ You can find more information about the `account_usage` schema [here](https://do
- **Include Temporary and Transient Tables**: - **Include Temporary and Transient Tables**:
Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables. Optional configuration for ingestion of `TRANSIENT` and `TEMPORARY` tables, By default, it will skip the `TRANSIENT` and `TEMPORARY` tables.
- **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration. - **Client Session Keep Alive**: Optional Configuration to keep the session active in case the ingestion job runs for longer duration.
- **Account Usage Schema Name**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field.
When using this field make sure you have all these tables available within your custom schema `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS`.
{% partial file="/v1.7/connectors/database/advanced-configuration.md" /%} {% partial file="/v1.7/connectors/database/advanced-configuration.md" /%}

View File

@ -150,6 +150,14 @@ This is a sample config for Snowflake:
{% /codeInfo %} {% /codeInfo %}
{% codeInfo srNumber=40 %}
**accountUsageSchema**: Full name of account usage schema, used in case your used do not have direct access to `SNOWFLAKE.ACCOUNT_USAGE` schema. In such case you can replicate tables `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS` to a custom schema let's say `CUSTOM_DB.CUSTOM_SCHEMA` and provide the same name in this field.
When using this field make sure you have all these tables available within your custom schema `QUERY_HISTORY`, `TAG_REFERENCES`, `PROCEDURES`, `FUNCTIONS`.
{% /codeInfo %}
{% codeInfo srNumber=6 %} {% codeInfo srNumber=6 %}
**includeTransientTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables. **includeTransientTables**: Optional configuration for ingestion of TRANSIENT and TEMPORARY tables, By default, it will skip the TRANSIENT and TEMPORARY tables.
@ -231,6 +239,9 @@ source:
```yaml {% srNumber=5 %} ```yaml {% srNumber=5 %}
# database: <database> # database: <database>
``` ```
```yaml {% srNumber=40 %}
# accountUsageSchema: SNOWFLAKE.ACCOUNT_USAGE
```
```yaml {% srNumber=6 %} ```yaml {% srNumber=6 %}
includeTransientTables: false includeTransientTables: false
``` ```

View File

@ -68,6 +68,12 @@
"description": "Session query tag used to monitor usage on snowflake. To use a query tag snowflake user should have enough privileges to alter the session.", "description": "Session query tag used to monitor usage on snowflake. To use a query tag snowflake user should have enough privileges to alter the session.",
"type": "string" "type": "string"
}, },
"accountUsageSchema":{
"title": "Account Usage Schema Name",
"description": "Full name of the schema where the account usage data is stored.",
"type": "string",
"default": "SNOWFLAKE.ACCOUNT_USAGE"
},
"privateKey": { "privateKey": {
"title": "Private Key", "title": "Private Key",
"description": "Connection to Snowflake instance via Private Key", "description": "Connection to Snowflake instance via Private Key",

View File

@ -19,6 +19,10 @@ export interface SnowflakeConnection {
* account is xyz1234.us-east-1.gcp * account is xyz1234.us-east-1.gcp
*/ */
account: string; account: string;
/**
* Full name of the schema where the account usage data is stored.
*/
accountUsageSchema?: string;
/** /**
* Optional configuration for ingestion to keep the client session active in case the * Optional configuration for ingestion to keep the client session active in case the
* ingestion process runs for longer durations. * ingestion process runs for longer durations.