mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-04 19:44:58 +00:00
Feature/dimensionality column values to be between (#24215)
* Initial implementation for Dimensionality on Data Quality Tests * Fix ColumnValuesToBeUnique and create TestCaseResult API * Refactor dimension result * Initial E2E Implementation without Impact Score * Dimensionality Thin Slice * Update generated TypeScript types * Update generated TypeScript types * Removed useless method to use the one we already had * Fix Pandas Dimensionality checks * Remove useless comments * Implement PR comments, fix Tests * Improve the code a bit * Fix imports * Implement Dimensionality for ColumnMeanToBeBetween * Removed useless comments and improved minor things * Implement UnitTests * Fixes * Moved import pandas to type checking * Fix Min/Max being optional * Fix Unittests * small fixes * Fix Unittests * Fix Issue with counting total rows on mean * Improve code * Fix Merge * Removed unused type * Refactor to reduce code repetition and complexity * Fix conflict * Rename method * Refactor some metrics * Implement Dimensionality to ColumnLengthToBeBetween * Implement Dimensionality for ColumnMedianToBeBetween in Pandas * Implement Median Dimensionality for SQL * Add database tests * Fix median metric * Implement Dimensionality SumToBeBetween * Implement dimensionality for Column Values not In Set * Implement Dimensionality for ColumnValuestoMatchRegex and ColumnValuesToNotMatchRegex * Implement NotNull and MissingCount dimensionality * Implement columnValuesToBeBetween dimensionality * Fix test --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
727560e88f
commit
ca5945c5be
@ -16,11 +16,21 @@ Validator for column values to be between test case
|
||||
import traceback
|
||||
from abc import abstractmethod
|
||||
from datetime import date, datetime, time
|
||||
from typing import Tuple, Union
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
from sqlalchemy import Column
|
||||
|
||||
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
|
||||
from metadata.data_quality.validations.base_test_handler import (
|
||||
DIMENSION_FAILED_COUNT_KEY,
|
||||
DIMENSION_TOTAL_COUNT_KEY,
|
||||
BaseTestValidator,
|
||||
DimensionInfo,
|
||||
DimensionResult,
|
||||
TestEvaluation,
|
||||
)
|
||||
from metadata.data_quality.validations.checkers.between_bounds_checker import (
|
||||
BetweenBoundsChecker,
|
||||
)
|
||||
from metadata.generated.schema.tests.basic import (
|
||||
TestCaseResult,
|
||||
TestCaseStatus,
|
||||
@ -41,19 +51,8 @@ MAX = "max"
|
||||
class BaseColumnValuesToBeBetweenValidator(BaseTestValidator):
|
||||
"""Validator for column values to be between test case"""
|
||||
|
||||
def _convert_date_to_datetime(
|
||||
self, date_object: date, time_converter: time
|
||||
) -> datetime:
|
||||
"""Convert date object to datetime object
|
||||
|
||||
Args:
|
||||
date_object (date): date object
|
||||
time_converter (time): time converter to use one of time.min or time.max
|
||||
|
||||
Returns:
|
||||
datetime:
|
||||
"""
|
||||
return datetime.combine(date_object, time_converter)
|
||||
MIN_BOUND = "minValue"
|
||||
MAX_BOUND = "maxValue"
|
||||
|
||||
def _run_validation(self) -> TestCaseResult:
|
||||
"""Execute the specific test validation logic
|
||||
@ -64,10 +63,20 @@ class BaseColumnValuesToBeBetweenValidator(BaseTestValidator):
|
||||
Returns:
|
||||
TestCaseResult: The test case result for the overall validation
|
||||
"""
|
||||
test_params = self._get_test_parameters()
|
||||
|
||||
try:
|
||||
column: Union[SQALikeColumn, Column] = self.get_column()
|
||||
min_res = self._run_results(Metrics.MIN, column)
|
||||
max_res = self._run_results(Metrics.MAX, column)
|
||||
|
||||
min_res = self._normalize_metric_value(min_res, is_min=True)
|
||||
max_res = self._normalize_metric_value(max_res, is_min=False)
|
||||
|
||||
metric_values = {
|
||||
Metrics.MIN.name: min_res,
|
||||
Metrics.MAX.name: max_res,
|
||||
}
|
||||
except (ValueError, RuntimeError) as exc:
|
||||
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
|
||||
logger.debug(traceback.format_exc())
|
||||
@ -82,15 +91,42 @@ class BaseColumnValuesToBeBetweenValidator(BaseTestValidator):
|
||||
],
|
||||
)
|
||||
|
||||
if type(min_res) is date: # pylint: disable=unidiomatic-typecheck
|
||||
min_res = self._convert_date_to_datetime(min_res, time.min)
|
||||
if type(max_res) is date: # pylint: disable=unidiomatic-typecheck
|
||||
max_res = self._convert_date_to_datetime(max_res, time.max)
|
||||
if self.test_case.computePassedFailedRowCount:
|
||||
row_count, failed_rows = self.compute_row_count(
|
||||
column, test_params[self.MIN_BOUND], test_params[self.MAX_BOUND]
|
||||
)
|
||||
else:
|
||||
row_count, failed_rows = None, None
|
||||
|
||||
evaluation = self._evaluate_test_condition(metric_values, test_params)
|
||||
result_message = self._format_result_message(
|
||||
metric_values, test_params=test_params
|
||||
)
|
||||
test_result_values = self._get_test_result_values(metric_values)
|
||||
|
||||
return self.get_test_case_result_object(
|
||||
self.execution_date,
|
||||
self.get_test_case_status(evaluation["matched"]),
|
||||
result_message,
|
||||
test_result_values,
|
||||
row_count=row_count,
|
||||
failed_rows=failed_rows,
|
||||
min_bound=test_params[self.MIN_BOUND]
|
||||
if not isinstance(test_params[self.MIN_BOUND], (datetime, date))
|
||||
else None,
|
||||
max_bound=test_params[self.MAX_BOUND]
|
||||
if not isinstance(test_params[self.MAX_BOUND], (datetime, date))
|
||||
else None,
|
||||
)
|
||||
|
||||
def _get_test_parameters(self) -> dict:
|
||||
"""Get Test Parameters"""
|
||||
column = self.get_column()
|
||||
|
||||
if is_date_time(column.type):
|
||||
min_bound = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"minValue",
|
||||
self.MIN_BOUND,
|
||||
type_=datetime.fromtimestamp,
|
||||
default=datetime.min,
|
||||
pre_processor=convert_timestamp,
|
||||
@ -98,38 +134,150 @@ class BaseColumnValuesToBeBetweenValidator(BaseTestValidator):
|
||||
|
||||
max_bound = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"maxValue",
|
||||
self.MAX_BOUND,
|
||||
type_=datetime.fromtimestamp,
|
||||
default=datetime.max,
|
||||
pre_processor=convert_timestamp,
|
||||
)
|
||||
else:
|
||||
min_bound = self.get_min_bound("minValue")
|
||||
max_bound = self.get_max_bound("maxValue")
|
||||
min_bound = self.get_min_bound(self.MIN_BOUND)
|
||||
max_bound = self.get_max_bound(self.MAX_BOUND)
|
||||
|
||||
if self.test_case.computePassedFailedRowCount:
|
||||
row_count, failed_rows = self.get_row_count(min_bound, max_bound)
|
||||
else:
|
||||
row_count, failed_rows = None, None
|
||||
return {
|
||||
self.MIN_BOUND: min_bound,
|
||||
self.MAX_BOUND: max_bound,
|
||||
}
|
||||
|
||||
return self.get_test_case_result_object(
|
||||
self.execution_date,
|
||||
self.get_test_case_status(min_res >= min_bound and max_res <= max_bound),
|
||||
f"Found min={min_res}, max={max_res} vs. the expected min={min_bound}, max={max_bound}.",
|
||||
[
|
||||
TestResultValue(name=MIN, value=str(min_res)),
|
||||
TestResultValue(name=MAX, value=str(max_res)),
|
||||
],
|
||||
row_count=row_count,
|
||||
failed_rows=failed_rows,
|
||||
min_bound=min_bound
|
||||
if not isinstance(min_bound, (datetime, date))
|
||||
else None,
|
||||
max_bound=max_bound
|
||||
if not isinstance(min_bound, (datetime, date))
|
||||
else None,
|
||||
def _get_metrics_to_compute(self, test_params: Optional[dict] = None) -> dict:
|
||||
"""Get Metrics needed to compute"""
|
||||
return {Metrics.MIN.name: Metrics.MIN, Metrics.MAX.name: Metrics.MAX}
|
||||
|
||||
def _evaluate_test_condition(
|
||||
self, metric_values: dict, test_params: dict
|
||||
) -> TestEvaluation:
|
||||
"""Evaluate the values-to-be-between test condition
|
||||
|
||||
For this test, the condition passes if both min and max values are within bounds.
|
||||
Since this is a statistical validator (group-level), passed/failed row counts
|
||||
are not applicable at the test level (only for computePassedFailedRowCount).
|
||||
|
||||
Args:
|
||||
metric_values: Dictionary with keys from Metrics enum names
|
||||
e.g., {"MIN": 10, "MAX": 100}
|
||||
test_params: Dictionary with 'minValue' and 'maxValue'
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
- matched: bool - whether test passed (both min >= min_bound and max <= max_bound)
|
||||
- passed_rows: None - not applicable for statistical validators
|
||||
- failed_rows: None - not applicable for statistical validators
|
||||
- total_rows: None - not applicable for statistical validators
|
||||
"""
|
||||
|
||||
min_value = metric_values[Metrics.MIN.name]
|
||||
max_value = metric_values[Metrics.MAX.name]
|
||||
min_bound = test_params[self.MIN_BOUND]
|
||||
max_bound = test_params[self.MAX_BOUND]
|
||||
|
||||
matched = min_value >= min_bound and max_value <= max_bound
|
||||
total_rows = metric_values.get(DIMENSION_TOTAL_COUNT_KEY)
|
||||
failed_rows = metric_values.get(DIMENSION_FAILED_COUNT_KEY)
|
||||
passed_rows = (
|
||||
total_rows - failed_rows
|
||||
if (total_rows is not None and failed_rows is not None)
|
||||
else None
|
||||
)
|
||||
|
||||
return {
|
||||
"matched": matched,
|
||||
"passed_rows": passed_rows,
|
||||
"failed_rows": failed_rows,
|
||||
"total_rows": total_rows,
|
||||
}
|
||||
|
||||
def _format_result_message(
|
||||
self,
|
||||
metric_values: dict,
|
||||
dimension_info: Optional[DimensionInfo] = None,
|
||||
test_params: Optional[dict] = None,
|
||||
) -> str:
|
||||
"""Format the result message for values-to-be-between test
|
||||
|
||||
Args:
|
||||
metric_values: Dictionary with Metrics enum names as keys
|
||||
dimension_info: Optional DimensionInfo with dimension details
|
||||
test_params: Test parameters with min/max bounds. Required for this test.
|
||||
|
||||
Returns:
|
||||
str: Formatted result message
|
||||
"""
|
||||
if test_params is None:
|
||||
raise ValueError(
|
||||
"test_params is required for columnValuesToBeBetween._format_result_message"
|
||||
)
|
||||
|
||||
min_value = metric_values[Metrics.MIN.name]
|
||||
max_value = metric_values[Metrics.MAX.name]
|
||||
min_bound = test_params[self.MIN_BOUND]
|
||||
max_bound = test_params[self.MAX_BOUND]
|
||||
|
||||
if dimension_info:
|
||||
return (
|
||||
f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: "
|
||||
f"Found min={min_value}, max={max_value} vs. the expected min={min_bound}, max={max_bound}"
|
||||
)
|
||||
else:
|
||||
return f"Found min={min_value}, max={max_value} vs. the expected min={min_bound}, max={max_bound}."
|
||||
|
||||
def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]:
|
||||
"""Get test result values for values-to-be-between test
|
||||
|
||||
Args:
|
||||
metric_values: Dictionary with Metrics enum names as keys
|
||||
|
||||
Returns:
|
||||
List[TestResultValue]: Test result values for the test case
|
||||
"""
|
||||
return [
|
||||
TestResultValue(name=MIN, value=str(metric_values[Metrics.MIN.name])),
|
||||
TestResultValue(name=MAX, value=str(metric_values[Metrics.MAX.name])),
|
||||
]
|
||||
|
||||
def _get_validation_checker(self, test_params: dict) -> BetweenBoundsChecker:
|
||||
"""Get the validation checker for this test
|
||||
|
||||
Args:
|
||||
test_params: Test parameters including min and max bounds
|
||||
|
||||
Returns:
|
||||
BetweenBoundsChecker configured with the test bounds
|
||||
"""
|
||||
return BetweenBoundsChecker(
|
||||
min_bound=test_params[self.MIN_BOUND],
|
||||
max_bound=test_params[self.MAX_BOUND],
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def _execute_dimensional_validation(
|
||||
self,
|
||||
column: Union[SQALikeColumn, Column],
|
||||
dimension_col: Union[SQALikeColumn, Column],
|
||||
metrics_to_compute: dict,
|
||||
test_params: dict,
|
||||
) -> List[DimensionResult]:
|
||||
"""Execute dimensional validation query for a single dimension column
|
||||
|
||||
Args:
|
||||
column: The column being tested (e.g., revenue)
|
||||
dimension_col: The dimension column to group by (e.g., region)
|
||||
metrics_to_compute: Dict mapping metric names to Metrics enum values
|
||||
test_params: Test parameters including min and max bounds
|
||||
|
||||
Returns:
|
||||
List of DimensionResult objects for each dimension value
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def _run_results(self, metric: Metrics, column: Union[SQALikeColumn, Column]):
|
||||
raise NotImplementedError
|
||||
@ -161,3 +309,9 @@ class BaseColumnValuesToBeBetweenValidator(BaseTestValidator):
|
||||
Tuple[int, int]:
|
||||
"""
|
||||
return self.compute_row_count(self.get_column(), min_bound, max_bound)
|
||||
|
||||
def _normalize_metric_value(self, value, is_min: bool):
|
||||
"""Normalize metric value - convert date to datetime if needed"""
|
||||
if type(value) is date:
|
||||
return datetime.combine(value, time.min if is_min else time.max)
|
||||
return value
|
||||
|
||||
@ -13,17 +13,34 @@
|
||||
Validator for column values to be between test case
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from collections import defaultdict
|
||||
from typing import List, Optional, cast
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from metadata.data_quality.validations.base_test_handler import (
|
||||
DIMENSION_FAILED_COUNT_KEY,
|
||||
DIMENSION_TOTAL_COUNT_KEY,
|
||||
DIMENSION_VALUE_KEY,
|
||||
)
|
||||
from metadata.data_quality.validations.column.base.columnValuesToBeBetween import (
|
||||
BaseColumnValuesToBeBetweenValidator,
|
||||
)
|
||||
from metadata.data_quality.validations.impact_score import (
|
||||
DEFAULT_TOP_DIMENSIONS,
|
||||
calculate_impact_score_pandas,
|
||||
)
|
||||
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
|
||||
PandasValidatorMixin,
|
||||
aggregate_others_statistical_pandas,
|
||||
)
|
||||
from metadata.generated.schema.tests.dimensionResult import DimensionResult
|
||||
from metadata.profiler.metrics.registry import Metrics
|
||||
from metadata.utils.logger import test_suite_logger
|
||||
from metadata.utils.sqa_like_column import SQALikeColumn
|
||||
|
||||
logger = test_suite_logger()
|
||||
|
||||
|
||||
class ColumnValuesToBeBetweenValidator(
|
||||
BaseColumnValuesToBeBetweenValidator, PandasValidatorMixin
|
||||
@ -39,6 +56,180 @@ class ColumnValuesToBeBetweenValidator(
|
||||
"""
|
||||
return self.run_dataframe_results(self.runner, metric, column)
|
||||
|
||||
def _execute_dimensional_validation(
|
||||
self,
|
||||
column: SQALikeColumn,
|
||||
dimension_col: SQALikeColumn,
|
||||
metrics_to_compute: dict,
|
||||
test_params: dict,
|
||||
) -> List[DimensionResult]:
|
||||
"""Execute dimensional validation for values to be between with proper aggregation
|
||||
|
||||
Follows the iterate pattern from the Mean metric's df_fn method to handle
|
||||
multiple dataframes efficiently without concatenating them in memory.
|
||||
|
||||
Memory-efficient approach: Instead of concatenating all dataframes (which creates
|
||||
a full copy in memory), we iterate over them and accumulate aggregates. This is
|
||||
especially important for large parquet files split across many chunks.
|
||||
|
||||
For values to be between validator, we accumulate min and max values across dataframes
|
||||
to accurately compute the overall min and max values per dimension. MIN and MAX aggregates
|
||||
naturally: min(min1, min2, ...) = overall_min, max(max1, max2, ...) = overall_max.
|
||||
|
||||
Args:
|
||||
column: The column being validated
|
||||
dimension_col: Single SQALikeColumn object corresponding to the dimension column
|
||||
metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects
|
||||
test_params: Dictionary with test-specific parameters (min/max bounds)
|
||||
|
||||
Returns:
|
||||
List[DimensionResult]: Top N dimensions by impact score plus "Others"
|
||||
"""
|
||||
checker = self._get_validation_checker(test_params)
|
||||
dimension_results = []
|
||||
|
||||
try:
|
||||
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
|
||||
min_impl = Metrics.MIN(column).get_pandas_computation()
|
||||
max_impl = Metrics.MAX(column).get_pandas_computation()
|
||||
row_count_impl = Metrics.ROW_COUNT().get_pandas_computation()
|
||||
|
||||
dimension_aggregates = defaultdict(
|
||||
lambda: {
|
||||
Metrics.MIN.name: min_impl.create_accumulator(),
|
||||
Metrics.MAX.name: max_impl.create_accumulator(),
|
||||
DIMENSION_TOTAL_COUNT_KEY: row_count_impl.create_accumulator(),
|
||||
}
|
||||
)
|
||||
|
||||
for df in dfs:
|
||||
df_typed = cast(pd.DataFrame, df)
|
||||
grouped = df_typed.groupby(dimension_col.name, dropna=False)
|
||||
|
||||
for dimension_value, group_df in grouped:
|
||||
dimension_value = self.format_dimension_value(dimension_value)
|
||||
|
||||
dimension_aggregates[dimension_value][
|
||||
Metrics.MIN.name
|
||||
] = min_impl.update_accumulator(
|
||||
dimension_aggregates[dimension_value][Metrics.MIN.name],
|
||||
group_df,
|
||||
)
|
||||
dimension_aggregates[dimension_value][
|
||||
Metrics.MAX.name
|
||||
] = max_impl.update_accumulator(
|
||||
dimension_aggregates[dimension_value][Metrics.MAX.name],
|
||||
group_df,
|
||||
)
|
||||
|
||||
dimension_aggregates[dimension_value][
|
||||
DIMENSION_TOTAL_COUNT_KEY
|
||||
] = row_count_impl.update_accumulator(
|
||||
dimension_aggregates[dimension_value][
|
||||
DIMENSION_TOTAL_COUNT_KEY
|
||||
],
|
||||
group_df,
|
||||
)
|
||||
|
||||
results_data = []
|
||||
for dimension_value, agg in dimension_aggregates.items():
|
||||
min_value = min_impl.aggregate_accumulator(agg[Metrics.MIN.name])
|
||||
max_value = max_impl.aggregate_accumulator(agg[Metrics.MAX.name])
|
||||
total_rows = row_count_impl.aggregate_accumulator(
|
||||
agg[DIMENSION_TOTAL_COUNT_KEY]
|
||||
)
|
||||
|
||||
if min_value is None or max_value is None:
|
||||
logger.warning(
|
||||
"Skipping '%s=%s' dimension since 'min' or 'max' are 'None'",
|
||||
dimension_col.name,
|
||||
dimension_value,
|
||||
)
|
||||
continue
|
||||
|
||||
# Normalize values (convert date to datetime if needed)
|
||||
min_value = self._normalize_metric_value(min_value, is_min=True)
|
||||
max_value = self._normalize_metric_value(max_value, is_min=False)
|
||||
|
||||
failed_count = (
|
||||
total_rows
|
||||
if checker.violates_pandas(
|
||||
{
|
||||
Metrics.MIN.name: min_value,
|
||||
Metrics.MAX.name: max_value,
|
||||
}
|
||||
)
|
||||
else 0
|
||||
)
|
||||
|
||||
results_data.append(
|
||||
{
|
||||
DIMENSION_VALUE_KEY: dimension_value,
|
||||
Metrics.MIN.name: min_value,
|
||||
Metrics.MAX.name: max_value,
|
||||
DIMENSION_TOTAL_COUNT_KEY: total_rows,
|
||||
DIMENSION_FAILED_COUNT_KEY: failed_count,
|
||||
}
|
||||
)
|
||||
|
||||
results_df = pd.DataFrame(results_data)
|
||||
|
||||
if not results_df.empty:
|
||||
results_df = calculate_impact_score_pandas(
|
||||
results_df,
|
||||
failed_column=DIMENSION_FAILED_COUNT_KEY,
|
||||
total_column=DIMENSION_TOTAL_COUNT_KEY,
|
||||
)
|
||||
|
||||
results_df = aggregate_others_statistical_pandas(
|
||||
results_df,
|
||||
dimension_column=DIMENSION_VALUE_KEY,
|
||||
agg_functions={
|
||||
Metrics.MIN.name: "min",
|
||||
Metrics.MAX.name: "max",
|
||||
DIMENSION_TOTAL_COUNT_KEY: "sum",
|
||||
DIMENSION_FAILED_COUNT_KEY: "sum",
|
||||
},
|
||||
top_n=DEFAULT_TOP_DIMENSIONS,
|
||||
violation_metrics=[
|
||||
Metrics.MIN.name,
|
||||
Metrics.MAX.name,
|
||||
],
|
||||
violation_predicate=checker.violates_pandas,
|
||||
)
|
||||
|
||||
for row_dict in results_df.to_dict("records"):
|
||||
metric_values = self._build_metric_values_from_row(
|
||||
row_dict, metrics_to_compute, test_params
|
||||
)
|
||||
|
||||
metric_values[DIMENSION_TOTAL_COUNT_KEY] = row_dict.get(
|
||||
DIMENSION_TOTAL_COUNT_KEY
|
||||
)
|
||||
metric_values[DIMENSION_FAILED_COUNT_KEY] = row_dict.get(
|
||||
DIMENSION_FAILED_COUNT_KEY
|
||||
)
|
||||
|
||||
evaluation = self._evaluate_test_condition(
|
||||
metric_values, test_params
|
||||
)
|
||||
|
||||
dimension_result = self._create_dimension_result(
|
||||
row_dict,
|
||||
dimension_col.name,
|
||||
metric_values,
|
||||
evaluation,
|
||||
test_params,
|
||||
)
|
||||
|
||||
dimension_results.append(dimension_result)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(f"Error executing dimensional query: {exc}")
|
||||
logger.debug("Full error details: ", exc_info=True)
|
||||
|
||||
return dimension_results
|
||||
|
||||
def compute_row_count(self, column: SQALikeColumn, min_bound: int, max_bound: int):
|
||||
"""Compute row count for the given column
|
||||
|
||||
|
||||
@ -13,17 +13,27 @@
|
||||
Validator for column values to be between test case
|
||||
"""
|
||||
import math
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import Column, func
|
||||
|
||||
from metadata.data_quality.validations.base_test_handler import (
|
||||
DIMENSION_FAILED_COUNT_KEY,
|
||||
DIMENSION_TOTAL_COUNT_KEY,
|
||||
DIMENSION_VALUE_KEY,
|
||||
)
|
||||
from metadata.data_quality.validations.column.base.columnValuesToBeBetween import (
|
||||
BaseColumnValuesToBeBetweenValidator,
|
||||
)
|
||||
from metadata.data_quality.validations.impact_score import DEFAULT_TOP_DIMENSIONS
|
||||
from metadata.data_quality.validations.mixins.sqa_validator_mixin import (
|
||||
SQAValidatorMixin,
|
||||
)
|
||||
from metadata.generated.schema.tests.dimensionResult import DimensionResult
|
||||
from metadata.profiler.metrics.registry import Metrics
|
||||
from metadata.utils.logger import test_suite_logger
|
||||
|
||||
logger = test_suite_logger()
|
||||
|
||||
|
||||
class ColumnValuesToBeBetweenValidator(
|
||||
@ -40,6 +50,104 @@ class ColumnValuesToBeBetweenValidator(
|
||||
"""
|
||||
return self.run_query_results(self.runner, metric, column)
|
||||
|
||||
def _execute_dimensional_validation(
|
||||
self,
|
||||
column: Column,
|
||||
dimension_col: Column,
|
||||
metrics_to_compute: dict,
|
||||
test_params: dict,
|
||||
) -> List[DimensionResult]:
|
||||
"""Execute dimensional validation for values to be between with proper aggregation
|
||||
|
||||
Uses the statistical aggregation helper to:
|
||||
1. Compute raw metrics (min, max) per dimension
|
||||
2. Calculate impact score based on whether both min and max are within bounds
|
||||
3. Aggregate "Others" using MIN(individual_mins) and MAX(individual_maxes)
|
||||
|
||||
Args:
|
||||
column: The column being validated
|
||||
dimension_col: The dimension column to group by
|
||||
metrics_to_compute: Dict mapping metric names to Metrics enums
|
||||
test_params: Test parameters (min/max bounds)
|
||||
|
||||
Returns:
|
||||
List[DimensionResult]: Top N dimensions plus "Others"
|
||||
"""
|
||||
dimension_results = []
|
||||
|
||||
try:
|
||||
metric_expressions = {
|
||||
DIMENSION_TOTAL_COUNT_KEY: Metrics.ROW_COUNT().fn(),
|
||||
Metrics.MIN.name: Metrics.MIN(column).fn(),
|
||||
Metrics.MAX.name: Metrics.MAX(column).fn(),
|
||||
}
|
||||
|
||||
def build_min_final(cte):
|
||||
"""Aggregate MIN values: take minimum of all mins"""
|
||||
return func.min(getattr(cte.c, Metrics.MIN.name))
|
||||
|
||||
def build_max_final(cte):
|
||||
"""Aggregate MAX values: take maximum of all maxes"""
|
||||
return func.max(getattr(cte.c, Metrics.MAX.name))
|
||||
|
||||
result_rows = self._execute_with_others_aggregation_statistical(
|
||||
dimension_col,
|
||||
metric_expressions,
|
||||
self._get_validation_checker(test_params).get_sqa_failed_rows_builder(
|
||||
{
|
||||
Metrics.MIN.name: Metrics.MIN.name,
|
||||
Metrics.MAX.name: Metrics.MAX.name,
|
||||
},
|
||||
DIMENSION_TOTAL_COUNT_KEY,
|
||||
),
|
||||
final_metric_builders={
|
||||
Metrics.MIN.name: build_min_final,
|
||||
Metrics.MAX.name: build_max_final,
|
||||
},
|
||||
top_dimensions_count=DEFAULT_TOP_DIMENSIONS,
|
||||
)
|
||||
|
||||
for row in result_rows:
|
||||
min_value = row.get(Metrics.MIN.name)
|
||||
max_value = row.get(Metrics.MAX.name)
|
||||
|
||||
if min_value is None or max_value is None:
|
||||
logger.warning(
|
||||
"Skipping '%s=%s' dimension since 'min' or 'max' are 'None'",
|
||||
dimension_col.name,
|
||||
row.get(DIMENSION_VALUE_KEY),
|
||||
)
|
||||
continue
|
||||
|
||||
# Normalize values (convert date to datetime if needed)
|
||||
min_value = self._normalize_metric_value(min_value, is_min=True)
|
||||
max_value = self._normalize_metric_value(max_value, is_min=False)
|
||||
|
||||
metric_values = {
|
||||
Metrics.MIN.name: min_value,
|
||||
Metrics.MAX.name: max_value,
|
||||
DIMENSION_TOTAL_COUNT_KEY: row.get(DIMENSION_TOTAL_COUNT_KEY),
|
||||
DIMENSION_FAILED_COUNT_KEY: row.get(DIMENSION_FAILED_COUNT_KEY),
|
||||
}
|
||||
|
||||
evaluation = self._evaluate_test_condition(metric_values, test_params)
|
||||
|
||||
dimension_result = self._create_dimension_result(
|
||||
row,
|
||||
dimension_col.name,
|
||||
metric_values,
|
||||
evaluation,
|
||||
test_params,
|
||||
)
|
||||
|
||||
dimension_results.append(dimension_result)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(f"Error executing dimensional query: {exc}")
|
||||
logger.debug("Full error details: ", exc_info=True)
|
||||
|
||||
return dimension_results
|
||||
|
||||
def compute_row_count(self, column: Column, min_bound: int, max_bound: int):
|
||||
"""Compute row count for the given column
|
||||
|
||||
|
||||
@ -1171,3 +1171,20 @@ def test_case_column_values_missing_count_to_be_equal_missing_values_dimensional
|
||||
dimensionColumns=["name"],
|
||||
computePassedFailedRowCount=True,
|
||||
) # type: ignore
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_case_column_values_to_be_between_dimensional():
|
||||
"""Test case for test column_values_to_be_between with dimensional analysis"""
|
||||
return TestCase(
|
||||
name=TEST_CASE_NAME,
|
||||
entityLink=ENTITY_LINK_AGE,
|
||||
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
|
||||
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
|
||||
parameterValues=[
|
||||
TestCaseParameterValue(name="minValue", value="30"),
|
||||
TestCaseParameterValue(name="maxValue", value="30"),
|
||||
],
|
||||
dimensionColumns=["name"],
|
||||
computePassedFailedRowCount=True,
|
||||
) # type: ignore
|
||||
|
||||
@ -933,6 +933,29 @@ TEST_CASE_SUPPORT_ROW_LEVEL_PASS_FAILED = {
|
||||
("name=Others", TestCaseStatus.Failed, None, None, None, None, 1),
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_case_column_values_to_be_between_dimensional",
|
||||
"columnValuesToBeBetween",
|
||||
"COLUMN",
|
||||
(
|
||||
TestCaseResult,
|
||||
"30",
|
||||
"31",
|
||||
TestCaseStatus.Failed,
|
||||
50,
|
||||
30,
|
||||
62.5,
|
||||
37.5,
|
||||
),
|
||||
[
|
||||
("name=Bob", TestCaseStatus.Failed, 0, 10, 0, 100, 0.0333),
|
||||
("name=Diana", TestCaseStatus.Failed, 0, 10, 0, 100, 0.0333),
|
||||
("name=Jane", TestCaseStatus.Failed, 0, 10, 0, 100, 0.0333),
|
||||
("name=Alice", TestCaseStatus.Success, 10, 0, 100, 0, 0),
|
||||
("name=Charlie", TestCaseStatus.Success, 10, 0, 100, 0, 0),
|
||||
("name=Others", TestCaseStatus.Success, 30, 0, 100, 0, 0),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_suite_validation_database(
|
||||
|
||||
@ -1131,6 +1131,29 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
|
||||
("name=Others", TestCaseStatus.Failed, None, None, None, None, 0.0741),
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_case_column_values_to_be_between_dimensional",
|
||||
"columnValuesToBeBetween",
|
||||
"COLUMN",
|
||||
(
|
||||
TestCaseResult,
|
||||
"30.0",
|
||||
"31.0",
|
||||
TestCaseStatus.Failed,
|
||||
10000,
|
||||
6000,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
[
|
||||
("name=Bob", TestCaseStatus.Failed, 0, 2000, 0, 100, 0.6667),
|
||||
("name=Diana", TestCaseStatus.Failed, 0, 2000, 0, 100, 0.6667),
|
||||
("name=Jane", TestCaseStatus.Failed, 0, 2000, 0, 100, 0.6667),
|
||||
("name=Alice", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
|
||||
("name=Charlie", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
|
||||
("name=Others", TestCaseStatus.Success, 4000, 0, 100, 0, 0),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_suite_validation_datalake(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user