diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index ac3f8bf5199..b5b83b9a5ed 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -86,6 +86,11 @@ def _( List[Dict]: """ logger.info(f"Fetching system metrics for {dialect}") + dml_stat_to_dml_statement_mapping = { + "inserted_row_count": "INSERT", + "deleted_row_count": "DELETE", + "updated_row_count": "UPDATE", + } region = ( f"region-{conn_config.usageLocation}" @@ -103,8 +108,8 @@ def _( FROM `{region}`.INFORMATION_SCHEMA.JOBS WHERE - DATE(creation_time) = CURRENT_DATE() - 1 AND - statement_type IN ('INSERT', 'UPDATE', 'INSERT') + DATE(creation_time) >= CURRENT_DATE() - 1 AND + statement_type IN ('INSERT', 'UPDATE', 'INSERT', 'MERGE') ORDER BY creation_time DESC; """ ) @@ -140,6 +145,21 @@ def _( rows_affected = row_jobs.dml_statistics.get("deleted_row_count") if row_jobs.query_type == "UPDATE": rows_affected = row_jobs.dml_statistics.get("updated_row_count") + if row_jobs.query_type == "MERGE": + for i, key in enumerate(row_jobs.dml_statistics): + if row_jobs.dml_statistics[key] != 0: + metric_results.append( + { + # Merge statement can include multiple DML operations + # We are padding timestamps by 0,1,2 millisesond to avoid + # duplicate timestamps + "timestamp": int(row_jobs.timestamp.timestamp() * 1000) + + i, + "operation": dml_stat_to_dml_statement_mapping.get(key), + "rowsAffected": row_jobs.dml_statistics[key], + } + ) + continue metric_results.append( { diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index d46f8d1ca9b..2eeadfd37fa 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -40,7 +40,7 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "percentile_cont(%s , %.1f) OVER()" % (col, percentile) + return "percentile_cont(%s , %s) OVER()" % (col, percentile) @compiles(MedianFn, Dialects.ClickHouse) @@ -48,7 +48,7 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "quantile(%.1f)(%s)" % (percentile, col) + return "quantile(%s)(%s)" % (percentile, col) # pylint: disable=unused-argument @@ -79,7 +79,7 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "percentile(cast(%s as BIGINT), %.1f)" % (col, percentile) + return "percentile(cast(%s as BIGINT), %s)" % (col, percentile) @compiles(MedianFn, Dialects.MySQL)