fix: disable sys. metrics for snowflake (#11900)

This commit is contained in:
Teddy 2023-06-06 22:58:12 +02:00 committed by GitHub
parent 57e44c22be
commit b0bfbad9da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -379,87 +379,87 @@ def _(
return metric_results
@get_system_metrics_for_dialect.register(Dialects.Snowflake)
def _(
dialect: str,
session: Session,
table: DeclarativeMeta,
*args,
**kwargs,
) -> Optional[List[Dict]]:
"""Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request.
We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types
(INSERTS, MERGE, DELETE, UPDATE).
# @get_system_metrics_for_dialect.register(Dialects.Snowflake)
# def _(
# dialect: str,
# session: Session,
# table: DeclarativeMeta,
# *args,
# **kwargs,
# ) -> Optional[List[Dict]]:
# """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request.
# We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types
# (INSERTS, MERGE, DELETE, UPDATE).
To get the number of rows affected we'll use the specific query ID.
# To get the number of rows affected we'll use the specific query ID.
Args:
dialect (str): dialect
session (Session): session object
# Args:
# dialect (str): dialect
# session (Session): session object
Returns:
Dict: system metric
"""
logger.debug(f"Fetching system metrics for {dialect}")
database = session.get_bind().url.database
schema = table.__table_args__["schema"]
# Returns:
# Dict: system metric
# """
# logger.debug(f"Fetching system metrics for {dialect}")
# database = session.get_bind().url.database
# schema = table.__table_args__["schema"]
metric_results: List[Dict] = []
# metric_results: List[Dict] = []
information_schema_query_history = f"""
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE
start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
AND QUERY_TYPE IN (
'{DatabaseDMLOperations.INSERT.value}',
'{DatabaseDMLOperations.UPDATE.value}',
'{DatabaseDMLOperations.DELETE.value}',
'{DatabaseDMLOperations.MERGE.value}'
)
AND EXECUTION_STATUS = 'SUCCESS';
"""
result_scan = """
SELECT *
FROM TABLE(RESULT_SCAN('{query_id}'));
"""
# information_schema_query_history = f"""
# SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
# WHERE
# start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
# AND QUERY_TYPE IN (
# '{DatabaseDMLOperations.INSERT.value}',
# '{DatabaseDMLOperations.UPDATE.value}',
# '{DatabaseDMLOperations.DELETE.value}',
# '{DatabaseDMLOperations.MERGE.value}'
# )
# AND EXECUTION_STATUS = 'SUCCESS';
# """
# result_scan = """
# SELECT *
# FROM TABLE(RESULT_SCAN('{query_id}'));
# """
if (
"query_results"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema]
):
# we'll try to get the cached data first
query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
"query_results"
]
else:
rows = session.execute(text(information_schema_query_history))
query_results = []
for row in rows:
result = get_snowflake_system_queries(row, database, schema)
if result:
query_results.append(result)
SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
"query_results"
] = query_results
# if (
# "query_results"
# in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema]
# ):
# # we'll try to get the cached data first
# query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
# "query_results"
# ]
# else:
# rows = session.execute(text(information_schema_query_history))
# query_results = []
# for row in rows:
# result = get_snowflake_system_queries(row, database, schema)
# if result:
# query_results.append(result)
# SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
# "query_results"
# ] = query_results
for query_result in query_results:
if table.__tablename__.lower() == query_result.table_name:
cursor_for_result_scan = session.execute(
text(dedent(result_scan.format(query_id=query_result.query_id)))
)
row_for_result_scan = cursor_for_result_scan.first()
# for query_result in query_results:
# if table.__tablename__.lower() == query_result.table_name:
# cursor_for_result_scan = session.execute(
# text(dedent(result_scan.format(query_id=query_result.query_id)))
# )
# row_for_result_scan = cursor_for_result_scan.first()
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DML_OPERATION_MAP.get(query_result.query_type),
"rowsAffected": row_for_result_scan[0]
if row_for_result_scan
else None,
}
)
# metric_results.append(
# {
# "timestamp": int(query_result.timestamp.timestamp() * 1000),
# "operation": DML_OPERATION_MAP.get(query_result.query_type),
# "rowsAffected": row_for_result_scan[0]
# if row_for_result_scan
# else None,
# }
# )
return metric_results
# return metric_results
class System(SystemMetric):