diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py b/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py index 66c7096f171..90e2d46fb65 100644 --- a/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py +++ b/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py @@ -20,12 +20,23 @@ from typing import Optional from sqlalchemy.engine.row import Row from metadata.utils.logger import profiler_logger -from metadata.utils.profiler_utils import QueryResult, get_identifiers_from_string +from metadata.utils.profiler_utils import ( + SnowflakeQueryResult, + get_identifiers_from_string, +) logger = profiler_logger() INFORMATION_SCHEMA_QUERY = """ - SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" + SELECT + QUERY_ID, + QUERY_TEXT, + QUERY_TYPE, + START_TIME, + ROWS_INSERTED, + ROWS_UPDATED, + ROWS_DELETED + FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" WHERE start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) AND QUERY_TEXT ILIKE '%{tablename}%' @@ -46,7 +57,7 @@ RESULT_SCAN = """ def get_snowflake_system_queries( row: Row, database: str, schema: str -) -> Optional[QueryResult]: +) -> Optional[SnowflakeQueryResult]: """get snowflake system queries for a specific database and schema. Parsing the query is the only reliable way to get the DDL operation as fields in the table are not. If parsing fails we'll fall back to regex lookup @@ -87,7 +98,7 @@ def get_snowflake_system_queries( database.lower() == database_name.lower() and schema.lower() == schema_name.lower() ): - return QueryResult( + return SnowflakeQueryResult( query_id=dict_row.get("QUERY_ID", dict_row.get("query_id")), database_name=database_name.lower(), schema_name=schema_name.lower(), @@ -95,6 +106,11 @@ def get_snowflake_system_queries( query_text=query_text, query_type=dict_row.get("QUERY_TYPE", dict_row.get("query_type")), timestamp=dict_row.get("START_TIME", dict_row.get("start_time")), + rows_inserted=dict_row.get( + "ROWS_INSERTED", dict_row.get("rows_inserted") + ), + rows_updated=dict_row.get("ROWS_UPDATED", dict_row.get("rows_updated")), + rows_deleted=dict_row.get("ROWS_DELETED", dict_row.get("rows_deleted")), ) except Exception: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index edbda7b42af..1d9826c9222 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -15,7 +15,6 @@ System Metric import traceback from collections import defaultdict -from textwrap import dedent from typing import Dict, List, Optional from sqlalchemy import text @@ -41,7 +40,6 @@ from metadata.profiler.metrics.system.queries.redshift import ( ) from metadata.profiler.metrics.system.queries.snowflake import ( INFORMATION_SCHEMA_QUERY, - RESULT_SCAN, get_snowflake_system_queries, ) from metadata.profiler.orm.registry import Dialects @@ -335,16 +333,37 @@ def _( query_results.append(result) for query_result in query_results: - cursor_for_result_scan = session.execute( - text(dedent(RESULT_SCAN.format(query_id=query_result.query_id))) - ) - row_for_result_scan = cursor_for_result_scan.first() + rows_affected = None + if query_result.query_type == DatabaseDMLOperations.INSERT.value: + rows_affected = query_result.rows_inserted + if query_result.query_type == DatabaseDMLOperations.DELETE.value: + rows_affected = query_result.rows_deleted + if query_result.query_type == DatabaseDMLOperations.UPDATE.value: + rows_affected = query_result.rows_updated + if query_result.query_type == DatabaseDMLOperations.MERGE.value: + if query_result.rows_inserted: + metric_results.append( + { + "timestamp": int(query_result.timestamp.timestamp() * 1000), + "operation": DatabaseDMLOperations.INSERT.value, + "rowsAffected": query_result.rows_inserted, + } + ) + if query_result.rows_updated: + metric_results.append( + { + "timestamp": int(query_result.timestamp.timestamp() * 1000), + "operation": DatabaseDMLOperations.UPDATE.value, + "rowsAffected": query_result.rows_updated, + } + ) + continue metric_results.append( { "timestamp": int(query_result.timestamp.timestamp() * 1000), "operation": DML_OPERATION_MAP.get(query_result.query_type), - "rowsAffected": row_for_result_scan[0] if row_for_result_scan else None, + "rowsAffected": rows_affected, } ) diff --git a/ingestion/src/metadata/utils/profiler_utils.py b/ingestion/src/metadata/utils/profiler_utils.py index 788c647278e..061960b5337 100644 --- a/ingestion/src/metadata/utils/profiler_utils.py +++ b/ingestion/src/metadata/utils/profiler_utils.py @@ -40,6 +40,14 @@ class QueryResult(BaseModel): rows: Optional[int] = None +class SnowflakeQueryResult(QueryResult): + """Snowflake system metric query result""" + + rows_inserted: Optional[int] = None + rows_updated: Optional[int] = None + rows_deleted: Optional[int] = None + + def clean_up_query(query: str) -> str: """remove comments and newlines from query""" return sqlparse.format(query, strip_comments=True).replace("\\n", "")