fix: remove RESULT_SCAN dep. (#13904)

This commit is contained in:
Teddy 2023-11-09 08:13:59 +01:00 committed by GitHub
parent ef2a27a217
commit 7f151ca5f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 11 deletions

View File

@ -20,12 +20,23 @@ from typing import Optional
from sqlalchemy.engine.row import Row from sqlalchemy.engine.row import Row
from metadata.utils.logger import profiler_logger from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import QueryResult, get_identifiers_from_string from metadata.utils.profiler_utils import (
SnowflakeQueryResult,
get_identifiers_from_string,
)
logger = profiler_logger() logger = profiler_logger()
INFORMATION_SCHEMA_QUERY = """ INFORMATION_SCHEMA_QUERY = """
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" SELECT
QUERY_ID,
QUERY_TEXT,
QUERY_TYPE,
START_TIME,
ROWS_INSERTED,
ROWS_UPDATED,
ROWS_DELETED
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE WHERE
start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
AND QUERY_TEXT ILIKE '%{tablename}%' AND QUERY_TEXT ILIKE '%{tablename}%'
@ -46,7 +57,7 @@ RESULT_SCAN = """
def get_snowflake_system_queries( def get_snowflake_system_queries(
row: Row, database: str, schema: str row: Row, database: str, schema: str
) -> Optional[QueryResult]: ) -> Optional[SnowflakeQueryResult]:
"""get snowflake system queries for a specific database and schema. Parsing the query """get snowflake system queries for a specific database and schema. Parsing the query
is the only reliable way to get the DDL operation as fields in the table are not. If parsing is the only reliable way to get the DDL operation as fields in the table are not. If parsing
fails we'll fall back to regex lookup fails we'll fall back to regex lookup
@ -87,7 +98,7 @@ def get_snowflake_system_queries(
database.lower() == database_name.lower() database.lower() == database_name.lower()
and schema.lower() == schema_name.lower() and schema.lower() == schema_name.lower()
): ):
return QueryResult( return SnowflakeQueryResult(
query_id=dict_row.get("QUERY_ID", dict_row.get("query_id")), query_id=dict_row.get("QUERY_ID", dict_row.get("query_id")),
database_name=database_name.lower(), database_name=database_name.lower(),
schema_name=schema_name.lower(), schema_name=schema_name.lower(),
@ -95,6 +106,11 @@ def get_snowflake_system_queries(
query_text=query_text, query_text=query_text,
query_type=dict_row.get("QUERY_TYPE", dict_row.get("query_type")), query_type=dict_row.get("QUERY_TYPE", dict_row.get("query_type")),
timestamp=dict_row.get("START_TIME", dict_row.get("start_time")), timestamp=dict_row.get("START_TIME", dict_row.get("start_time")),
rows_inserted=dict_row.get(
"ROWS_INSERTED", dict_row.get("rows_inserted")
),
rows_updated=dict_row.get("ROWS_UPDATED", dict_row.get("rows_updated")),
rows_deleted=dict_row.get("ROWS_DELETED", dict_row.get("rows_deleted")),
) )
except Exception: except Exception:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -15,7 +15,6 @@ System Metric
import traceback import traceback
from collections import defaultdict from collections import defaultdict
from textwrap import dedent
from typing import Dict, List, Optional from typing import Dict, List, Optional
from sqlalchemy import text from sqlalchemy import text
@ -41,7 +40,6 @@ from metadata.profiler.metrics.system.queries.redshift import (
) )
from metadata.profiler.metrics.system.queries.snowflake import ( from metadata.profiler.metrics.system.queries.snowflake import (
INFORMATION_SCHEMA_QUERY, INFORMATION_SCHEMA_QUERY,
RESULT_SCAN,
get_snowflake_system_queries, get_snowflake_system_queries,
) )
from metadata.profiler.orm.registry import Dialects from metadata.profiler.orm.registry import Dialects
@ -335,16 +333,37 @@ def _(
query_results.append(result) query_results.append(result)
for query_result in query_results: for query_result in query_results:
cursor_for_result_scan = session.execute( rows_affected = None
text(dedent(RESULT_SCAN.format(query_id=query_result.query_id))) if query_result.query_type == DatabaseDMLOperations.INSERT.value:
) rows_affected = query_result.rows_inserted
row_for_result_scan = cursor_for_result_scan.first() if query_result.query_type == DatabaseDMLOperations.DELETE.value:
rows_affected = query_result.rows_deleted
if query_result.query_type == DatabaseDMLOperations.UPDATE.value:
rows_affected = query_result.rows_updated
if query_result.query_type == DatabaseDMLOperations.MERGE.value:
if query_result.rows_inserted:
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DatabaseDMLOperations.INSERT.value,
"rowsAffected": query_result.rows_inserted,
}
)
if query_result.rows_updated:
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DatabaseDMLOperations.UPDATE.value,
"rowsAffected": query_result.rows_updated,
}
)
continue
metric_results.append( metric_results.append(
{ {
"timestamp": int(query_result.timestamp.timestamp() * 1000), "timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DML_OPERATION_MAP.get(query_result.query_type), "operation": DML_OPERATION_MAP.get(query_result.query_type),
"rowsAffected": row_for_result_scan[0] if row_for_result_scan else None, "rowsAffected": rows_affected,
} }
) )

View File

@ -40,6 +40,14 @@ class QueryResult(BaseModel):
rows: Optional[int] = None rows: Optional[int] = None
class SnowflakeQueryResult(QueryResult):
"""Snowflake system metric query result"""
rows_inserted: Optional[int] = None
rows_updated: Optional[int] = None
rows_deleted: Optional[int] = None
def clean_up_query(query: str) -> str: def clean_up_query(query: str) -> str:
"""remove comments and newlines from query""" """remove comments and newlines from query"""
return sqlparse.format(query, strip_comments=True).replace("\\n", "") return sqlparse.format(query, strip_comments=True).replace("\\n", "")