Fix Validators (#24310)

This commit is contained in:
IceS2 2025-11-13 11:18:15 +01:00 committed by GitHub
parent 274db6dd7c
commit a3238edb66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 51 additions and 7 deletions

View File

@ -110,7 +110,7 @@ class ColumnValueLengthsToBeBetweenValidator(
dimension_aggregates[dimension_value][
Metrics.MIN_LENGTH.name
] = max_impl.update_accumulator(
] = min_impl.update_accumulator(
dimension_aggregates[dimension_value][Metrics.MIN_LENGTH.name],
group_df,
)

View File

@ -14,7 +14,7 @@ Validator for column values to be unique test case
"""
import logging
from collections import defaultdict
from collections import Counter, defaultdict
from typing import List, Optional, cast
import pandas as pd
@ -33,7 +33,7 @@ from metadata.data_quality.validations.impact_score import (
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
@ -41,6 +41,8 @@ from metadata.utils.sqa_like_column import SQALikeColumn
logger = logging.getLogger(__name__)
COUNTER_ACCUMULATOR_KEY = "counter_accumulator"
class ColumnValuesToBeUniqueValidator(
BaseColumnValuesToBeUniqueValidator, PandasValidatorMixin
@ -128,18 +130,20 @@ class ColumnValuesToBeUniqueValidator(
for dimension_value, agg in dimension_aggregates.items():
total_count = agg[Metrics.COUNT.name]
total_rows = agg[DIMENSION_TOTAL_COUNT_KEY]
counter_accumulator = agg[Metrics.UNIQUE_COUNT.name]
unique_count = unique_count_impl.aggregate_accumulator(
agg[Metrics.UNIQUE_COUNT.name]
counter_accumulator
)
duplicate_count = total_count - unique_count
failed_count = total_count - unique_count
results_data.append(
{
DIMENSION_VALUE_KEY: dimension_value,
COUNTER_ACCUMULATOR_KEY: counter_accumulator,
Metrics.COUNT.name: total_count,
Metrics.UNIQUE_COUNT.name: unique_count,
DIMENSION_TOTAL_COUNT_KEY: total_rows,
DIMENSION_FAILED_COUNT_KEY: duplicate_count,
DIMENSION_FAILED_COUNT_KEY: failed_count,
}
)
@ -152,9 +156,49 @@ class ColumnValuesToBeUniqueValidator(
total_column=DIMENSION_TOTAL_COUNT_KEY,
)
results_df = aggregate_others_pandas(
def calculate_unique_count_from_counter(
df_aggregated, others_mask, metric_column
):
result = df_aggregated[metric_column].copy()
if others_mask.any():
merged_counter = df_aggregated.loc[
others_mask, COUNTER_ACCUMULATOR_KEY
].iloc[0]
unique_count = sum(1 for v in merged_counter.values() if v == 1)
result.loc[others_mask] = unique_count
return result
def calculate_failed_count_from_metrics(
df_aggregated, others_mask, metric_column
):
result = df_aggregated[metric_column].copy()
if others_mask.any():
count = df_aggregated.loc[others_mask, Metrics.COUNT.name].iloc[
0
]
unique_count = df_aggregated.loc[
others_mask, Metrics.UNIQUE_COUNT.name
].iloc[0]
failed_count = count - unique_count
result.loc[others_mask] = failed_count
return result
results_df = aggregate_others_statistical_pandas(
results_df,
dimension_column=DIMENSION_VALUE_KEY,
agg_functions={
COUNTER_ACCUMULATOR_KEY: lambda counters: sum(
counters, Counter()
),
Metrics.COUNT.name: "sum",
DIMENSION_TOTAL_COUNT_KEY: "sum",
DIMENSION_FAILED_COUNT_KEY: "sum",
},
final_metric_calculators={
Metrics.UNIQUE_COUNT.name: calculate_unique_count_from_counter,
DIMENSION_FAILED_COUNT_KEY: calculate_failed_count_from_metrics,
},
exclude_from_final=[COUNTER_ACCUMULATOR_KEY],
top_n=DEFAULT_TOP_DIMENSIONS,
)