From a3238edb667abc3e7437bf8aff6e98e945152ec0 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Thu, 13 Nov 2025 11:18:15 +0100 Subject: [PATCH] Fix Validators (#24310) --- .../pandas/columnValueLengthsToBeBetween.py | 2 +- .../column/pandas/columnValuesToBeUnique.py | 56 +++++++++++++++++-- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValueLengthsToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValueLengthsToBeBetween.py index 8c1662e747c..53305588ff3 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValueLengthsToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValueLengthsToBeBetween.py @@ -110,7 +110,7 @@ class ColumnValueLengthsToBeBetweenValidator( dimension_aggregates[dimension_value][ Metrics.MIN_LENGTH.name - ] = max_impl.update_accumulator( + ] = min_impl.update_accumulator( dimension_aggregates[dimension_value][Metrics.MIN_LENGTH.name], group_df, ) diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py index 4400feb2e2a..850a1928813 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py @@ -14,7 +14,7 @@ Validator for column values to be unique test case """ import logging -from collections import defaultdict +from collections import Counter, defaultdict from typing import List, Optional, cast import pandas as pd @@ -33,7 +33,7 @@ from metadata.data_quality.validations.impact_score import ( ) from metadata.data_quality.validations.mixins.pandas_validator_mixin import ( PandasValidatorMixin, - aggregate_others_pandas, + aggregate_others_statistical_pandas, ) from metadata.generated.schema.tests.dimensionResult import DimensionResult from metadata.profiler.metrics.registry import Metrics @@ -41,6 +41,8 @@ from metadata.utils.sqa_like_column import SQALikeColumn logger = logging.getLogger(__name__) +COUNTER_ACCUMULATOR_KEY = "counter_accumulator" + class ColumnValuesToBeUniqueValidator( BaseColumnValuesToBeUniqueValidator, PandasValidatorMixin @@ -128,18 +130,20 @@ class ColumnValuesToBeUniqueValidator( for dimension_value, agg in dimension_aggregates.items(): total_count = agg[Metrics.COUNT.name] total_rows = agg[DIMENSION_TOTAL_COUNT_KEY] + counter_accumulator = agg[Metrics.UNIQUE_COUNT.name] unique_count = unique_count_impl.aggregate_accumulator( - agg[Metrics.UNIQUE_COUNT.name] + counter_accumulator ) - duplicate_count = total_count - unique_count + failed_count = total_count - unique_count results_data.append( { DIMENSION_VALUE_KEY: dimension_value, + COUNTER_ACCUMULATOR_KEY: counter_accumulator, Metrics.COUNT.name: total_count, Metrics.UNIQUE_COUNT.name: unique_count, DIMENSION_TOTAL_COUNT_KEY: total_rows, - DIMENSION_FAILED_COUNT_KEY: duplicate_count, + DIMENSION_FAILED_COUNT_KEY: failed_count, } ) @@ -152,9 +156,49 @@ class ColumnValuesToBeUniqueValidator( total_column=DIMENSION_TOTAL_COUNT_KEY, ) - results_df = aggregate_others_pandas( + def calculate_unique_count_from_counter( + df_aggregated, others_mask, metric_column + ): + result = df_aggregated[metric_column].copy() + if others_mask.any(): + merged_counter = df_aggregated.loc[ + others_mask, COUNTER_ACCUMULATOR_KEY + ].iloc[0] + unique_count = sum(1 for v in merged_counter.values() if v == 1) + result.loc[others_mask] = unique_count + return result + + def calculate_failed_count_from_metrics( + df_aggregated, others_mask, metric_column + ): + result = df_aggregated[metric_column].copy() + if others_mask.any(): + count = df_aggregated.loc[others_mask, Metrics.COUNT.name].iloc[ + 0 + ] + unique_count = df_aggregated.loc[ + others_mask, Metrics.UNIQUE_COUNT.name + ].iloc[0] + failed_count = count - unique_count + result.loc[others_mask] = failed_count + return result + + results_df = aggregate_others_statistical_pandas( results_df, dimension_column=DIMENSION_VALUE_KEY, + agg_functions={ + COUNTER_ACCUMULATOR_KEY: lambda counters: sum( + counters, Counter() + ), + Metrics.COUNT.name: "sum", + DIMENSION_TOTAL_COUNT_KEY: "sum", + DIMENSION_FAILED_COUNT_KEY: "sum", + }, + final_metric_calculators={ + Metrics.UNIQUE_COUNT.name: calculate_unique_count_from_counter, + DIMENSION_FAILED_COUNT_KEY: calculate_failed_count_from_metrics, + }, + exclude_from_final=[COUNTER_ACCUMULATOR_KEY], top_n=DEFAULT_TOP_DIMENSIONS, )