diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index f3beef856df..e0425a7ac2f 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -379,87 +379,87 @@ def _( return metric_results -@get_system_metrics_for_dialect.register(Dialects.Snowflake) -def _( - dialect: str, - session: Session, - table: DeclarativeMeta, - *args, - **kwargs, -) -> Optional[List[Dict]]: - """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request. - We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types - (INSERTS, MERGE, DELETE, UPDATE). +# @get_system_metrics_for_dialect.register(Dialects.Snowflake) +# def _( +# dialect: str, +# session: Session, +# table: DeclarativeMeta, +# *args, +# **kwargs, +# ) -> Optional[List[Dict]]: +# """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request. +# We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types +# (INSERTS, MERGE, DELETE, UPDATE). - To get the number of rows affected we'll use the specific query ID. +# To get the number of rows affected we'll use the specific query ID. - Args: - dialect (str): dialect - session (Session): session object +# Args: +# dialect (str): dialect +# session (Session): session object - Returns: - Dict: system metric - """ - logger.debug(f"Fetching system metrics for {dialect}") - database = session.get_bind().url.database - schema = table.__table_args__["schema"] +# Returns: +# Dict: system metric +# """ +# logger.debug(f"Fetching system metrics for {dialect}") +# database = session.get_bind().url.database +# schema = table.__table_args__["schema"] - metric_results: List[Dict] = [] +# metric_results: List[Dict] = [] - information_schema_query_history = f""" - SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" - WHERE - start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) - AND QUERY_TYPE IN ( - '{DatabaseDMLOperations.INSERT.value}', - '{DatabaseDMLOperations.UPDATE.value}', - '{DatabaseDMLOperations.DELETE.value}', - '{DatabaseDMLOperations.MERGE.value}' - ) - AND EXECUTION_STATUS = 'SUCCESS'; - """ - result_scan = """ - SELECT * - FROM TABLE(RESULT_SCAN('{query_id}')); - """ +# information_schema_query_history = f""" +# SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" +# WHERE +# start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) +# AND QUERY_TYPE IN ( +# '{DatabaseDMLOperations.INSERT.value}', +# '{DatabaseDMLOperations.UPDATE.value}', +# '{DatabaseDMLOperations.DELETE.value}', +# '{DatabaseDMLOperations.MERGE.value}' +# ) +# AND EXECUTION_STATUS = 'SUCCESS'; +# """ +# result_scan = """ +# SELECT * +# FROM TABLE(RESULT_SCAN('{query_id}')); +# """ - if ( - "query_results" - in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema] - ): - # we'll try to get the cached data first - query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ - "query_results" - ] - else: - rows = session.execute(text(information_schema_query_history)) - query_results = [] - for row in rows: - result = get_snowflake_system_queries(row, database, schema) - if result: - query_results.append(result) - SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ - "query_results" - ] = query_results +# if ( +# "query_results" +# in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema] +# ): +# # we'll try to get the cached data first +# query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ +# "query_results" +# ] +# else: +# rows = session.execute(text(information_schema_query_history)) +# query_results = [] +# for row in rows: +# result = get_snowflake_system_queries(row, database, schema) +# if result: +# query_results.append(result) +# SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ +# "query_results" +# ] = query_results - for query_result in query_results: - if table.__tablename__.lower() == query_result.table_name: - cursor_for_result_scan = session.execute( - text(dedent(result_scan.format(query_id=query_result.query_id))) - ) - row_for_result_scan = cursor_for_result_scan.first() +# for query_result in query_results: +# if table.__tablename__.lower() == query_result.table_name: +# cursor_for_result_scan = session.execute( +# text(dedent(result_scan.format(query_id=query_result.query_id))) +# ) +# row_for_result_scan = cursor_for_result_scan.first() - metric_results.append( - { - "timestamp": int(query_result.timestamp.timestamp() * 1000), - "operation": DML_OPERATION_MAP.get(query_result.query_type), - "rowsAffected": row_for_result_scan[0] - if row_for_result_scan - else None, - } - ) +# metric_results.append( +# { +# "timestamp": int(query_result.timestamp.timestamp() * 1000), +# "operation": DML_OPERATION_MAP.get(query_result.query_type), +# "rowsAffected": row_for_result_scan[0] +# if row_for_result_scan +# else None, +# } +# ) - return metric_results +# return metric_results class System(SystemMetric):