Fix Row Counts for LengthsToBeBetweena nd ValueToBeBetween (#24359)

This commit is contained in:
IceS2 2025-11-17 15:19:41 +01:00 committed by GitHub
parent 615cb8ec50
commit 8c0215a353
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 242 additions and 103 deletions

View File

@ -30,13 +30,48 @@ class BetweenBoundsChecker(BaseValidationChecker):
self.min_bound = min_bound
self.max_bound = max_bound
def _value_violates(self, value: Any) -> bool:
"""Check violation of one value"""
def _check_violations(self, values):
"""Core violation check logic - works for both scalar and Series.
Checks if values are outside [min_bound, max_bound].
This method is polymorphic: works identically for scalar values and pandas Series.
Args:
values: Single value or pandas Series
Returns:
Boolean or Series of booleans indicating violations (True = violates)
"""
import pandas as pd
if value is None or pd.isna(value):
return False
return not (self.min_bound <= value <= self.max_bound)
return ~pd.isna(values) & (
(values < self.min_bound) | (values > self.max_bound)
)
def _value_violates(self, value: Any) -> bool:
"""Check violation of one value (scalar).
Args:
value: Single value to check
Returns:
bool: True if value violates bounds
"""
return bool(self._check_violations(value))
def get_violations_mask(self, series):
"""Get boolean mask of violations for pandas Series (vectorized).
Returns a boolean Series where True indicates a violation.
Implements the same logic as _value_violates() but vectorized for performance.
Args:
series: pandas Series of values to check
Returns:
pandas Series of booleans indicating violations
"""
return self._check_violations(series)
def violates_pandas(self, metrics: Mapping[str, Any]) -> bool:
"""Check if any value is outside [min_bound, max_bound]. Used on Pandas Data Quality."""
@ -66,3 +101,37 @@ class BetweenBoundsChecker(BaseValidationChecker):
if not conditions:
return literal(False)
return or_(*conditions) if len(conditions) > 1 else conditions[0]
def build_row_level_violations_sqa(
self, column: "ClauseElement"
) -> "ClauseElement":
"""Build SQL expression to count row-level violations.
Returns a SUM(CASE...) expression that counts individual rows where
the column value is outside [min_bound, max_bound].
This is the SQL equivalent of get_violations_mask() for pandas.
Args:
column: SQLAlchemy column expression to check
Returns:
SQLAlchemy expression that sums up row-level violations
"""
from sqlalchemy import and_, case, func, literal, or_
# Build condition: value NOT NULL AND (value < min OR value > max)
conditions = []
if not math.isinf(self.min_bound):
conditions.append(and_(column.isnot(None), column < self.min_bound))
if not math.isinf(self.max_bound):
conditions.append(and_(column.isnot(None), column > self.max_bound))
if not conditions:
return literal(0)
violation_condition = or_(*conditions) if len(conditions) > 1 else conditions[0]
# Return SUM(CASE WHEN violation THEN 1 ELSE 0 END)
return func.sum(case((violation_condition, literal(1)), else_=literal(0)))

View File

@ -20,6 +20,8 @@ from typing import List, Optional, Tuple, Union
from sqlalchemy import Column
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_TOTAL_COUNT_KEY,
BaseTestValidator,
DimensionInfo,
DimensionResult,
@ -144,20 +146,23 @@ class BaseColumnValueLengthsToBeBetweenValidator(BaseTestValidator):
) -> TestEvaluation:
"""Evaluate the max-to-be-between test condition
For max test, the condition passes if the max value is within the specified bounds.
Since this is a statistical validator (group-level), passed/failed row counts are not applicable.
For dimensional validation, computes row-level passed/failed counts.
For non-dimensional validation, row counts are not applicable.
Args:
metric_values: Dictionary with keys from Metrics enum names
e.g., {"MAX": 42.5}
test_params: Dictionary with 'minValueForMaxInCol' and 'maxValueForMaxInCol'
e.g., {"MAX_LENGTH": 10, "MIN_LENGTH": 1}
For dimensional validation, also includes:
- DIMENSION_TOTAL_COUNT_KEY: total rows
- DIMENSION_FAILED_COUNT_KEY: failed rows
test_params: Dictionary with 'minLength' and 'maxLength'
Returns:
dict with keys:
- matched: bool - whether test passed (max within bounds)
- passed_rows: None - not applicable for statistical validators
- failed_rows: None - not applicable for statistical validators
- total_rows: None - not applicable for statistical validators
- matched: bool - whether test passed (lengths within bounds)
- passed_rows: Optional[int] - rows with valid lengths (or None for non-dimensional)
- failed_rows: Optional[int] - rows with invalid lengths (or None for non-dimensional)
- total_rows: Optional[int] - total rows (or None for non-dimensional)
"""
min_length_value = metric_values[Metrics.MIN_LENGTH.name]
max_length_value = metric_values[Metrics.MAX_LENGTH.name]
@ -166,11 +171,18 @@ class BaseColumnValueLengthsToBeBetweenValidator(BaseTestValidator):
matched = min_bound <= min_length_value and max_length_value <= max_bound
# Extract row counts if available (dimensional validation)
total_rows = metric_values.get(DIMENSION_TOTAL_COUNT_KEY)
failed_rows = metric_values.get(DIMENSION_FAILED_COUNT_KEY)
passed_rows = None
if total_rows is not None and failed_rows is not None:
passed_rows = total_rows - failed_rows
return {
"matched": matched,
"passed_rows": None,
"failed_rows": None,
"total_rows": None,
"passed_rows": passed_rows,
"failed_rows": failed_rows,
"total_rows": total_rows,
}
def _format_result_message(

View File

@ -92,12 +92,14 @@ class ColumnValueLengthsToBeBetweenValidator(
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
min_impl = Metrics.MIN_LENGTH(column).get_pandas_computation()
max_impl = Metrics.MAX_LENGTH(column).get_pandas_computation()
row_count_impl = Metrics.ROW_COUNT().get_pandas_computation()
dimension_aggregates = defaultdict(
lambda: {
Metrics.MIN_LENGTH.name: min_impl.create_accumulator(),
Metrics.MAX_LENGTH.name: max_impl.create_accumulator(),
DIMENSION_TOTAL_COUNT_KEY: 0,
DIMENSION_TOTAL_COUNT_KEY: row_count_impl.create_accumulator(),
DIMENSION_FAILED_COUNT_KEY: 0,
}
)
@ -123,13 +125,33 @@ class ColumnValueLengthsToBeBetweenValidator(
dimension_aggregates[dimension_value][
DIMENSION_TOTAL_COUNT_KEY
] += len(group_df)
] = row_count_impl.update_accumulator(
dimension_aggregates[dimension_value][
DIMENSION_TOTAL_COUNT_KEY
],
group_df,
)
# Count row-level violations by checking lengths against bounds
col_values = group_df[column.name]
col_lengths = col_values.str.len()
violations_mask = checker.get_violations_mask(col_lengths)
dimension_aggregates[dimension_value][
DIMENSION_FAILED_COUNT_KEY
] += violations_mask.sum()
results_data = []
for dimension_value, agg in dimension_aggregates.items():
min_length_value = agg[Metrics.MIN_LENGTH.name]
max_length_value = agg[Metrics.MAX_LENGTH.name]
total_rows = agg[DIMENSION_TOTAL_COUNT_KEY]
min_length_value = min_impl.aggregate_accumulator(
agg[Metrics.MIN_LENGTH.name]
)
max_length_value = max_impl.aggregate_accumulator(
agg[Metrics.MAX_LENGTH.name]
)
total_rows = row_count_impl.aggregate_accumulator(
agg[DIMENSION_TOTAL_COUNT_KEY]
)
failed_count = agg[DIMENSION_FAILED_COUNT_KEY]
if min_length_value is None or max_length_value is None:
logger.warning(
@ -138,16 +160,6 @@ class ColumnValueLengthsToBeBetweenValidator(
dimension_value,
)
continue
failed_count = (
total_rows
if checker.violates_pandas(
{
Metrics.MIN_LENGTH.name: min_length_value,
Metrics.MAX_LENGTH.name: max_length_value,
}
)
else 0
)
results_data.append(
{
@ -190,6 +202,14 @@ class ColumnValueLengthsToBeBetweenValidator(
row_dict, metrics_to_compute, test_params
)
# Add row count keys for dimensional validation
metric_values[DIMENSION_TOTAL_COUNT_KEY] = row_dict.get(
DIMENSION_TOTAL_COUNT_KEY
)
metric_values[DIMENSION_FAILED_COUNT_KEY] = row_dict.get(
DIMENSION_FAILED_COUNT_KEY
)
evaluation = self._evaluate_test_condition(
metric_values, test_params
)

View File

@ -99,6 +99,7 @@ class ColumnValuesToBeBetweenValidator(
Metrics.MIN.name: min_impl.create_accumulator(),
Metrics.MAX.name: max_impl.create_accumulator(),
DIMENSION_TOTAL_COUNT_KEY: row_count_impl.create_accumulator(),
DIMENSION_FAILED_COUNT_KEY: 0,
}
)
@ -131,6 +132,13 @@ class ColumnValuesToBeBetweenValidator(
group_df,
)
# Count row-level violations using checker's unified logic
col_values = group_df[column.name]
violations_mask = checker.get_violations_mask(col_values)
dimension_aggregates[dimension_value][
DIMENSION_FAILED_COUNT_KEY
] += violations_mask.sum()
results_data = []
for dimension_value, agg in dimension_aggregates.items():
min_value = min_impl.aggregate_accumulator(agg[Metrics.MIN.name])
@ -138,6 +146,7 @@ class ColumnValuesToBeBetweenValidator(
total_rows = row_count_impl.aggregate_accumulator(
agg[DIMENSION_TOTAL_COUNT_KEY]
)
failed_count = agg[DIMENSION_FAILED_COUNT_KEY]
if min_value is None or max_value is None:
logger.warning(
@ -151,17 +160,6 @@ class ColumnValuesToBeBetweenValidator(
min_value = self._normalize_metric_value(min_value, is_min=True)
max_value = self._normalize_metric_value(max_value, is_min=False)
failed_count = (
total_rows
if checker.violates_pandas(
{
Metrics.MIN.name: min_value,
Metrics.MAX.name: max_value,
}
)
else 0
)
results_data.append(
{
DIMENSION_VALUE_KEY: dimension_value,

View File

@ -18,6 +18,7 @@ from typing import List, Optional
from sqlalchemy import Column, func
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_TOTAL_COUNT_KEY,
)
from metadata.data_quality.validations.column.base.columnValueLengthsToBeBetween import (
@ -103,10 +104,15 @@ class ColumnValueLengthsToBeBetweenValidator(
dimension_results = []
try:
checker = self._get_validation_checker(test_params)
metric_expressions = {
DIMENSION_TOTAL_COUNT_KEY: func.count(),
DIMENSION_TOTAL_COUNT_KEY: Metrics.ROW_COUNT().fn(),
Metrics.MIN_LENGTH.name: Metrics.MIN_LENGTH(column).fn(),
Metrics.MAX_LENGTH.name: Metrics.MAX_LENGTH(column).fn(),
DIMENSION_FAILED_COUNT_KEY: checker.build_row_level_violations_sqa(
LenFn(column)
),
}
def build_min_len_final(cte):
@ -118,13 +124,7 @@ class ColumnValueLengthsToBeBetweenValidator(
result_rows = self._execute_with_others_aggregation_statistical(
dimension_col,
metric_expressions,
self._get_validation_checker(test_params).get_sqa_failed_rows_builder(
{
Metrics.MIN_LENGTH.name: Metrics.MIN_LENGTH.name,
Metrics.MAX_LENGTH.name: Metrics.MAX_LENGTH.name,
},
DIMENSION_TOTAL_COUNT_KEY,
),
None, # Use row-level count from metric_expressions
final_metric_builders={
Metrics.MIN_LENGTH.name: build_min_len_final,
Metrics.MAX_LENGTH.name: build_max_len_final,
@ -142,6 +142,8 @@ class ColumnValueLengthsToBeBetweenValidator(
metric_values = {
Metrics.MIN_LENGTH.name: min_len_value,
Metrics.MAX_LENGTH.name: max_len_value,
DIMENSION_TOTAL_COUNT_KEY: row.get(DIMENSION_TOTAL_COUNT_KEY),
DIMENSION_FAILED_COUNT_KEY: row.get(DIMENSION_FAILED_COUNT_KEY),
}
evaluation = self._evaluate_test_condition(metric_values, test_params)

View File

@ -76,10 +76,15 @@ class ColumnValuesToBeBetweenValidator(
dimension_results = []
try:
checker = self._get_validation_checker(test_params)
metric_expressions = {
DIMENSION_TOTAL_COUNT_KEY: Metrics.ROW_COUNT().fn(),
Metrics.MIN.name: Metrics.MIN(column).fn(),
Metrics.MAX.name: Metrics.MAX(column).fn(),
DIMENSION_FAILED_COUNT_KEY: checker.build_row_level_violations_sqa(
column
),
}
def build_min_final(cte):
@ -93,13 +98,7 @@ class ColumnValuesToBeBetweenValidator(
result_rows = self._execute_with_others_aggregation_statistical(
dimension_col,
metric_expressions,
self._get_validation_checker(test_params).get_sqa_failed_rows_builder(
{
Metrics.MIN.name: Metrics.MIN.name,
Metrics.MAX.name: Metrics.MAX.name,
},
DIMENSION_TOTAL_COUNT_KEY,
),
None, # Use row-level count from metric_expressions
final_metric_builders={
Metrics.MIN.name: build_min_final,
Metrics.MAX.name: build_max_final,

View File

@ -346,7 +346,7 @@ class SQAValidatorMixin:
self,
dimension_col: Column,
metric_expressions: Dict[str, ClauseElement],
failed_count_builder: FailedCountBuilderSQA,
failed_count_builder: Optional[FailedCountBuilderSQA] = None,
final_metric_builders: Optional[Dict[str, Any]] = None,
exclude_from_results: Optional[List[str]] = None,
top_dimensions_count: int = DEFAULT_TOP_DIMENSIONS,
@ -426,6 +426,20 @@ class SQAValidatorMixin:
f"metric_expressions must contain '{DIMENSION_TOTAL_COUNT_KEY}'"
)
# Validate failed_count_builder and DIMENSION_FAILED_COUNT_KEY relationship
if failed_count_builder is None:
if DIMENSION_FAILED_COUNT_KEY not in metric_expressions:
raise ValueError(
f"When failed_count_builder is None, metric_expressions must contain "
f"'{DIMENSION_FAILED_COUNT_KEY}' key"
)
else:
if DIMENSION_FAILED_COUNT_KEY in metric_expressions:
raise ValueError(
f"When failed_count_builder is provided, metric_expressions should NOT contain "
f"'{DIMENSION_FAILED_COUNT_KEY}' key (it will be computed by the builder)"
)
# Cast dimension column to VARCHAR to ensure compatibility with string literals
# This prevents type mismatch errors when mixing numeric columns with 'NULL'/'Others' labels
dimension_col_as_string = func.cast(dimension_col, String)
@ -459,26 +473,36 @@ class SQAValidatorMixin:
# ---- CTE 2: Add failed_count and impact_score
total_count_col = getattr(raw_aggregates.c, DIMENSION_TOTAL_COUNT_KEY)
failed_count_expr = failed_count_builder(raw_aggregates)
if failed_count_builder is None:
# Failed count already in raw_aggregates from metric_expressions
failed_count_expr = getattr(raw_aggregates.c, DIMENSION_FAILED_COUNT_KEY)
else:
# Compute failed count using builder
failed_count_expr = failed_count_builder(raw_aggregates)
impact_score_expr = get_impact_score_expression(
failed_count_expr, total_count_col
)
stats_with_impact_columns = [col for col in raw_aggregates.c]
stats_with_impact_columns.append(
failed_count_expr.label(DIMENSION_FAILED_COUNT_KEY)
)
stats_with_impact_columns.append(
# Build CTE2 columns - only label failed_count if not already labeled
stats_with_impact_select = [col for col in raw_aggregates.c]
if failed_count_builder is None:
# failed_count_expr already has the label from raw_aggregates
pass # It's already in raw_aggregates.c columns
else:
# failed_count_expr needs to be labeled
stats_with_impact_select.append(
failed_count_expr.label(DIMENSION_FAILED_COUNT_KEY)
)
stats_with_impact_select.append(
impact_score_expr.label(DIMENSION_IMPACT_SCORE_KEY)
)
stats_with_impact = (
select(
*[col for col in raw_aggregates.c],
failed_count_expr.label(DIMENSION_FAILED_COUNT_KEY),
impact_score_expr.label(DIMENSION_IMPACT_SCORE_KEY),
)
select(stats_with_impact_select)
.select_from(raw_aggregates)
.cte(CTE_DIMENSION_WITH_IMPACT)
)
@ -526,9 +550,12 @@ class SQAValidatorMixin:
return final_metric_builders[metric_name](categorized)
return func.sum(getattr(categorized.c, metric_name))
# Add metrics from metric_expressions (skip DIMENSION_FAILED_COUNT_KEY - handled separately)
for metric_name in metric_expressions.keys():
final_columns.append(build_final_metric(metric_name).label(metric_name))
if metric_name != DIMENSION_FAILED_COUNT_KEY:
final_columns.append(build_final_metric(metric_name).label(metric_name))
# Always sum failed_count for Others aggregation
final_failed_count = func.sum(
getattr(categorized.c, DIMENSION_FAILED_COUNT_KEY)
).label(DIMENSION_FAILED_COUNT_KEY)
@ -550,25 +577,37 @@ class SQAValidatorMixin:
):
result_columns.append(col)
failed_count_expr = failed_count_builder(final_cte)
if failed_count_builder is None:
# Failed count is summed row-level count, use directly (already labeled)
result_failed_count = getattr(final_cte.c, DIMENSION_FAILED_COUNT_KEY)
else:
# Re-compute for Others using builder (statistical validators)
failed_count_expr = failed_count_builder(final_cte)
result_failed_count = case(
(
getattr(final_cte.c, DIMENSION_VALUE_KEY) != DIMENSION_OTHERS_LABEL,
getattr(final_cte.c, DIMENSION_FAILED_COUNT_KEY),
),
else_=failed_count_expr,
).label(DIMENSION_FAILED_COUNT_KEY)
result_failed_count = case(
(
getattr(final_cte.c, DIMENSION_VALUE_KEY) != DIMENSION_OTHERS_LABEL,
getattr(final_cte.c, DIMENSION_FAILED_COUNT_KEY),
),
else_=failed_count_expr,
).label(DIMENSION_FAILED_COUNT_KEY)
result_columns.append(result_failed_count)
# Impact score: preserve for top N, recompute for "Others" automatically
if failed_count_builder is None:
# Use result_failed_count which references final_cte
impact_failed_count_expr = result_failed_count
else:
# Use failed_count_expr from builder
impact_failed_count_expr = failed_count_expr
result_impact = case(
(
getattr(final_cte.c, DIMENSION_VALUE_KEY) != DIMENSION_OTHERS_LABEL,
getattr(final_cte.c, DIMENSION_IMPACT_SCORE_KEY),
),
else_=get_impact_score_expression(
failed_count_expr,
impact_failed_count_expr,
getattr(final_cte.c, DIMENSION_TOTAL_COUNT_KEY),
),
).label(DIMENSION_IMPACT_SCORE_KEY)

View File

@ -745,12 +745,12 @@ TEST_CASE_SUPPORT_ROW_LEVEL_PASS_FAILED = {
12.5,
),
[
("name=John", TestCaseStatus.Failed, None, None, None, None, 0.0667),
("name=Alice", TestCaseStatus.Success, None, None, None, None, 0),
("name=Bob", TestCaseStatus.Success, None, None, None, None, 0),
("name=Charlie", TestCaseStatus.Success, None, None, None, None, 0),
("name=Diana", TestCaseStatus.Success, None, None, None, None, 0),
("name=Others", TestCaseStatus.Success, None, None, None, None, 0),
("name=John", TestCaseStatus.Failed, 10, 10, 50, 50, 0.0167),
("name=Alice", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Diana", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 20, 0, 100, 0, 0),
],
),
(
@ -768,12 +768,12 @@ TEST_CASE_SUPPORT_ROW_LEVEL_PASS_FAILED = {
0,
),
[
("name=Alice", TestCaseStatus.Success, None, None, None, None, 0),
("name=Bob", TestCaseStatus.Success, None, None, None, None, 0),
("name=Charlie", TestCaseStatus.Success, None, None, None, None, 0),
("name=Diana", TestCaseStatus.Success, None, None, None, None, 0),
("name=Eve", TestCaseStatus.Success, None, None, None, None, 0),
("name=Others", TestCaseStatus.Success, None, None, None, None, 0),
("name=Alice", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Diana", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Eve", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 30, 0, 100, 0, 0),
],
),
(

View File

@ -925,12 +925,12 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
12.5,
),
[
("name=John", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Alice", TestCaseStatus.Success, None, None, None, None, 0),
("name=Bob", TestCaseStatus.Success, None, None, None, None, 0),
("name=Charlie", TestCaseStatus.Success, None, None, None, None, 0),
("name=Diana", TestCaseStatus.Success, None, None, None, None, 0),
("name=Others", TestCaseStatus.Success, None, None, None, None, 0),
("name=John", TestCaseStatus.Failed, 2000, 2000, 50, 50, 0.1667),
("name=Alice", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Diana", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 4000, 0, 100, 0, 0),
],
),
(
@ -948,12 +948,12 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
0,
),
[
("name=Alice", TestCaseStatus.Success, None, None, None, None, 0),
("name=Bob", TestCaseStatus.Success, None, None, None, None, 0),
("name=Charlie", TestCaseStatus.Success, None, None, None, None, 0),
("name=Diana", TestCaseStatus.Success, None, None, None, None, 0),
("name=Eve", TestCaseStatus.Success, None, None, None, None, 0),
("name=Others", TestCaseStatus.Success, None, None, None, None, 0),
("name=Alice", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Diana", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Eve", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 6000, 0, 100, 0, 0),
],
),
(