feat: Added logic to handle MERGE statement for bigquery (#10522)

This commit is contained in:
Teddy 2023-03-13 11:34:40 +01:00 committed by GitHub
parent db292eaa0b
commit d03b06daf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 5 deletions

View File

@ -86,6 +86,11 @@ def _(
List[Dict]:
"""
logger.info(f"Fetching system metrics for {dialect}")
dml_stat_to_dml_statement_mapping = {
"inserted_row_count": "INSERT",
"deleted_row_count": "DELETE",
"updated_row_count": "UPDATE",
}
region = (
f"region-{conn_config.usageLocation}"
@ -103,8 +108,8 @@ def _(
FROM
`{region}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) = CURRENT_DATE() - 1 AND
statement_type IN ('INSERT', 'UPDATE', 'INSERT')
DATE(creation_time) >= CURRENT_DATE() - 1 AND
statement_type IN ('INSERT', 'UPDATE', 'INSERT', 'MERGE')
ORDER BY creation_time DESC;
"""
)
@ -140,6 +145,21 @@ def _(
rows_affected = row_jobs.dml_statistics.get("deleted_row_count")
if row_jobs.query_type == "UPDATE":
rows_affected = row_jobs.dml_statistics.get("updated_row_count")
if row_jobs.query_type == "MERGE":
for i, key in enumerate(row_jobs.dml_statistics):
if row_jobs.dml_statistics[key] != 0:
metric_results.append(
{
# Merge statement can include multiple DML operations
# We are padding timestamps by 0,1,2 millisesond to avoid
# duplicate timestamps
"timestamp": int(row_jobs.timestamp.timestamp() * 1000)
+ i,
"operation": dml_stat_to_dml_statement_mapping.get(key),
"rowsAffected": row_jobs.dml_statistics[key],
}
)
continue
metric_results.append(
{

View File

@ -40,7 +40,7 @@ def _(elements, compiler, **kwargs):
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "percentile_cont(%s , %.1f) OVER()" % (col, percentile)
return "percentile_cont(%s , %s) OVER()" % (col, percentile)
@compiles(MedianFn, Dialects.ClickHouse)
@ -48,7 +48,7 @@ def _(elements, compiler, **kwargs):
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "quantile(%.1f)(%s)" % (percentile, col)
return "quantile(%s)(%s)" % (percentile, col)
# pylint: disable=unused-argument
@ -79,7 +79,7 @@ def _(elements, compiler, **kwargs):
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "percentile(cast(%s as BIGINT), %.1f)" % (col, percentile)
return "percentile(cast(%s as BIGINT), %s)" % (col, percentile)
@compiles(MedianFn, Dialects.MySQL)