Feature/dimensionality column values to be not null (#24211)

* Initial implementation for Dimensionality on Data Quality Tests

* Fix ColumnValuesToBeUnique and create TestCaseResult API

* Refactor dimension result

* Initial E2E Implementation without Impact Score

* Dimensionality Thin Slice

* Update generated TypeScript types

* Update generated TypeScript types

* Removed useless method to use the one we already had

* Fix Pandas Dimensionality checks

* Remove useless comments

* Implement PR comments, fix Tests

* Improve the code a bit

* Fix imports

* Implement Dimensionality for ColumnMeanToBeBetween

* Removed useless comments and improved minor things

* Implement UnitTests

* Fixes

* Moved import pandas to type checking

* Fix Min/Max being optional

* Fix Unittests

* small fixes

* Fix Unittests

* Fix Issue with counting total rows on mean

* Improve code

* Fix Merge

* Removed unused type

* Refactor to reduce code repetition and complexity

* Fix conflict

* Rename method

* Refactor some metrics

* Implement Dimensionality to ColumnLengthToBeBetween

* Implement Dimensionality for ColumnMedianToBeBetween in Pandas

* Implement Median Dimensionality for SQL

* Add database tests

* Fix median metric

* Implement Dimensionality SumToBeBetween

* Implement dimensionality for Column Values not In Set

* Implement Dimensionality for ColumnValuestoMatchRegex and ColumnValuesToNotMatchRegex

* Implement NotNull and MissingCount dimensionality

* Fix test

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
IceS2 2025-11-07 14:44:58 +01:00 committed by GitHub
parent 14e6b7ccc4
commit 4e398d003b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1059 additions and 67 deletions

View File

@ -247,16 +247,16 @@ class BaseTestValidator(ABC):
logger.debug(traceback.format_exc())
return []
def _get_test_parameters(self) -> Optional[dict]:
def _get_test_parameters(self) -> dict:
"""Get test-specific parameters from test case
Default implementation returns None. Override in child classes
Default implementation returns empty dict. Override in child classes
that need to extract and process test parameters.
Returns:
Optional[dict]: Test parameters, or None if validator has no parameters.
dict: Test parameters, or empty dict if validator has no parameters.
"""
return None
return {}
def _get_metrics_to_compute(self, test_params: Optional[dict] = None) -> dict:
"""Get metrics that need to be computed for this test
@ -450,20 +450,12 @@ class BaseTestValidator(ABC):
test_result_values = self._get_test_result_values(metric_values)
impact_score = row.get(DIMENSION_IMPACT_SCORE_KEY, 0.0)
# Get total_rows from evaluation if present, otherwise from metric_values
# Import here to avoid circular import (metrics.registry -> utils.importer -> base_test_handler)
from metadata.profiler.metrics.registry import Metrics
total_rows = evaluation.get(
"total_rows", metric_values.get(Metrics.ROW_COUNT.name)
)
return self.get_dimension_result_object(
dimension_values={dimension_col_name: dimension_value},
test_case_status=self.get_test_case_status(evaluation["matched"]),
result=result_message,
test_result_value=test_result_values,
total_rows=total_rows,
total_rows=evaluation["total_rows"],
passed_rows=evaluation["passed_rows"],
failed_rows=evaluation["failed_rows"],
impact_score=impact_score,

View File

@ -16,11 +16,15 @@ Validator for column value missing count to be equal test case
import traceback
from abc import abstractmethod
from ast import literal_eval
from typing import Union
from typing import List, Optional, Union
from sqlalchemy import Column
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.base_test_handler import (
BaseTestValidator,
DimensionInfo,
TestEvaluation,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
@ -38,6 +42,10 @@ NULL_COUNT = "nullCount"
class BaseColumnValuesMissingCountValidator(BaseTestValidator):
"""Validator for column value missing count to be equal test case"""
MISSING_VALUE_MATCH = "missingValueMatch"
MISSING_COUNT_VALUE = "missingCountValue"
TOTAL_MISSING_COUNT = "total_missing_count"
def _run_validation(self) -> TestCaseResult:
"""Execute the specific test validation logic
@ -47,12 +55,34 @@ class BaseColumnValuesMissingCountValidator(BaseTestValidator):
Returns:
TestCaseResult: The test case result for the overall validation
"""
test_params = self._get_test_parameters()
try:
column: Union[SQALikeColumn, Column] = self.get_column()
null_res = self._run_results(
null_missing_count = self._run_results(
Metrics.NULL_MISSING_COUNT,
column,
)
metric_values = {
Metrics.NULL_MISSING_COUNT.name: null_missing_count,
}
if test_params.get(self.MISSING_VALUE_MATCH):
# if user supplies missing values, we need to compute the count of missing values
# in addition to the count of null values
count_in_set = self._run_results(
Metrics.COUNT_IN_SET,
column,
values=test_params[self.MISSING_VALUE_MATCH],
)
metric_values[Metrics.COUNT_IN_SET.name] = count_in_set
metric_values[self.TOTAL_MISSING_COUNT] = (
null_missing_count + count_in_set
)
else:
metric_values[self.TOTAL_MISSING_COUNT] = null_missing_count
except (ValueError, RuntimeError) as exc:
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
@ -64,45 +94,143 @@ class BaseColumnValuesMissingCountValidator(BaseTestValidator):
[TestResultValue(name=NULL_COUNT, value=None)],
)
evaluation = self._evaluate_test_condition(metric_values, test_params)
result_message = self._format_result_message(
metric_values, test_params=test_params
)
test_result_values = self._get_test_result_values(metric_values)
return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(evaluation["matched"]),
result_message,
test_result_values,
)
def _get_test_parameters(self) -> dict:
"""Extract test-specific parameters from test case
Returns:
dict with keys: MISSING_VALUE_MATCH, MISSING_COUNT_VALUE
"""
missing_values = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"missingValueMatch",
self.MISSING_VALUE_MATCH,
literal_eval,
)
missing_count_value = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"missingCountValue",
self.MISSING_COUNT_VALUE,
literal_eval,
)
if missing_values:
# if user supplies missing values, we need to compute the count of missing values
# in addition to the count of null values
try:
set_res = self._run_results(
Metrics.COUNT_IN_SET, column, values=missing_values
)
null_res += set_res
except (ValueError, RuntimeError) as exc:
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
logger.warning(msg)
return self.get_test_case_result_object(
self.execution_date,
TestCaseStatus.Aborted,
msg,
[
TestResultValue(name=NULL_COUNT, value=None),
],
)
return {
self.MISSING_VALUE_MATCH: missing_values,
self.MISSING_COUNT_VALUE: missing_count_value,
}
return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(missing_count_value == null_res),
f"Found nullCount={null_res} vs. the expected nullCount={missing_count_value}.",
[TestResultValue(name=NULL_COUNT, value=str(null_res))],
)
def _get_metrics_to_compute(self, test_params: dict) -> dict:
"""Define which metrics to compute based on test parameters
Args:
test_params: Dictionary with MISSING_VALUE_MATCH and MISSING_COUNT_VALUE
Returns:
dict: Mapping of Metrics enum names to Metrics enum values
"""
metrics = {
Metrics.NULL_MISSING_COUNT.name: Metrics.NULL_MISSING_COUNT,
}
if test_params.get(self.MISSING_VALUE_MATCH):
metrics[Metrics.COUNT_IN_SET.name] = Metrics.COUNT_IN_SET
return metrics
def _evaluate_test_condition(
self, metric_values: dict, test_params: Optional[dict] = None
) -> TestEvaluation:
"""Evaluate the missing count test condition
Test passes if total_missing_count == expected missing_count_value
Args:
metric_values: Dictionary with keys from Metrics enum names
e.g., {"NULL_MISSING_COUNT": 5, "COUNT_IN_SET": 3}
test_params: Dictionary with MISSING_VALUE_MATCH and MISSING_COUNT_VALUE
Returns:
TestEvaluation: TypedDict with keys:
- matched: bool - whether test passed
- passed_rows: None - not computed for this validator
- failed_rows: None - not computed for this validator
- total_rows: None - not computed for this validator
"""
if test_params is None:
raise ValueError(
"test_params is required for columnValuesMissingCount._evaluate_test_condition"
)
total_missing_count = metric_values[self.TOTAL_MISSING_COUNT]
expected_missing_count = test_params[self.MISSING_COUNT_VALUE]
matched = total_missing_count == expected_missing_count
return {
"matched": matched,
"passed_rows": None,
"failed_rows": None,
"total_rows": None,
}
def _format_result_message(
self,
metric_values: dict,
dimension_info: Optional[DimensionInfo] = None,
test_params: Optional[dict] = None,
) -> str:
"""Format the result message for missing count test
Args:
metric_values: Dictionary with Metrics enum names as keys
dimension_info: Optional DimensionInfo with dimension details
test_params: Optional test parameters with expected MISSING_COUNT_VALUE
Returns:
str: Formatted result message
"""
if test_params is None:
raise ValueError(
"test_params is required for columnValuesMissingCount._format_result_message"
)
total_missing_count = metric_values[self.TOTAL_MISSING_COUNT]
expected_missing_count = test_params[self.MISSING_COUNT_VALUE]
if dimension_info:
return (
f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: "
f"Found nullCount={total_missing_count} vs. the expected nullCount={expected_missing_count}."
)
else:
return f"Found nullCount={total_missing_count} vs. the expected nullCount={expected_missing_count}."
def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]:
"""Get test result values for missing count test
Args:
metric_values: Dictionary with Metrics enum names as keys
Returns:
List[TestResultValue]: Test result values for the test case
"""
return [
TestResultValue(
name=NULL_COUNT,
value=str(metric_values[self.TOTAL_MISSING_COUNT]),
),
]
@abstractmethod
def _run_results(

View File

@ -15,11 +15,15 @@ Validator for column values to be not null test case
import traceback
from abc import abstractmethod
from typing import Union
from typing import List, Optional, Union
from sqlalchemy import Column
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.base_test_handler import (
BaseTestValidator,
DimensionInfo,
TestEvaluation,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
@ -46,9 +50,18 @@ class BaseColumnValuesToBeNotNullValidator(BaseTestValidator):
Returns:
TestCaseResult: The test case result for the overall validation
"""
test_params = self._get_test_parameters()
try:
column: Union[SQALikeColumn, Column] = self.get_column()
res = self._run_results(Metrics.NULL_COUNT, column)
null_count = self._run_results(Metrics.NULL_COUNT, column)
metric_values = {
Metrics.NULL_COUNT.name: null_count,
}
if self.test_case.computePassedFailedRowCount:
metric_values[Metrics.ROW_COUNT.name] = self.get_row_count()
except (ValueError, RuntimeError) as exc:
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
@ -60,20 +73,115 @@ class BaseColumnValuesToBeNotNullValidator(BaseTestValidator):
[TestResultValue(name=NULL_COUNT, value=None)],
)
if self.test_case.computePassedFailedRowCount:
row_count = self.get_row_count()
else:
row_count = None
evaluation = self._evaluate_test_condition(metric_values, test_params)
result_message = self._format_result_message(
metric_values, test_params=test_params
)
test_result_values = self._get_test_result_values(metric_values)
return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(res == 0),
f"Found nullCount={res}. It should be 0",
[TestResultValue(name=NULL_COUNT, value=str(res))],
row_count=row_count,
failed_rows=res,
self.get_test_case_status(evaluation["matched"]),
result_message,
test_result_values,
row_count=evaluation["total_rows"],
passed_rows=evaluation["passed_rows"],
failed_rows=evaluation["failed_rows"],
)
def _get_metrics_to_compute(self, test_params: dict) -> dict:
"""Define which metrics to compute based on test parameters
Args:
test_params: Dictionary (empty for this validator)
Returns:
dict: Mapping of Metrics enum names to Metrics enum values
"""
metrics = {
Metrics.NULL_COUNT.name: Metrics.NULL_COUNT,
}
if self.test_case.computePassedFailedRowCount:
metrics[Metrics.ROW_COUNT.name] = Metrics.ROW_COUNT
return metrics
def _evaluate_test_condition(
self, metric_values: dict, test_params: Optional[dict] = None
) -> TestEvaluation:
"""Evaluate the not null test condition
Test passes if null_count == 0 (no null values found)
Args:
metric_values: Dictionary with keys from Metrics enum names
e.g., {"NULL_COUNT": 0, "ROW_COUNT": 100}
test_params: Dictionary (not used for this validator)
Returns:
TestEvaluation: TypedDict with keys:
- matched: bool - whether test passed (null_count == 0)
- passed_rows: int - number of non-null values
- failed_rows: int - number of null values
- total_rows: int - total row count for reporting
"""
null_count = metric_values[Metrics.NULL_COUNT.name]
total_rows = metric_values.get(Metrics.ROW_COUNT.name)
matched = null_count == 0
failed_count = null_count
passed_count = total_rows - null_count if total_rows else 0
return {
"matched": matched,
"passed_rows": passed_count,
"failed_rows": failed_count,
"total_rows": total_rows,
}
def _format_result_message(
self,
metric_values: dict,
dimension_info: Optional[DimensionInfo] = None,
test_params: Optional[dict] = None,
) -> str:
"""Format the result message for not null test
Args:
metric_values: Dictionary with Metrics enum names as keys
dimension_info: Optional DimensionInfo with dimension details
test_params: Optional test parameters (not used by this validator)
Returns:
str: Formatted result message
"""
null_count = metric_values[Metrics.NULL_COUNT.name]
if dimension_info:
return (
f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: "
f"Found nullCount={null_count}. It should be 0"
)
else:
return f"Found nullCount={null_count}. It should be 0"
def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]:
"""Get test result values for not null test
Args:
metric_values: Dictionary with Metrics enum names as keys
Returns:
List[TestResultValue]: Test result values for the test case
"""
return [
TestResultValue(
name=NULL_COUNT,
value=str(metric_values[Metrics.NULL_COUNT.name]),
),
]
@abstractmethod
def _run_results(self, metric: Metrics, column: Union[SQALikeColumn, Column]):
raise NotImplementedError

View File

@ -12,18 +12,35 @@
"""
Validator for column value missing count to be equal test case
"""
from collections import defaultdict
from typing import List, Optional, cast
from typing import Optional
import pandas as pd
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_TOTAL_COUNT_KEY,
DIMENSION_VALUE_KEY,
)
from metadata.data_quality.validations.column.base.columnValuesMissingCount import (
BaseColumnValuesMissingCountValidator,
)
from metadata.data_quality.validations.impact_score import (
DEFAULT_TOP_DIMENSIONS,
calculate_impact_score_pandas,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn
logger = test_suite_logger()
class ColumnValuesMissingCountValidator(
BaseColumnValuesMissingCountValidator, PandasValidatorMixin
@ -40,3 +57,163 @@ class ColumnValuesMissingCountValidator(
column: column
"""
return self.run_dataframe_results(self.runner, metric, column, **kwargs)
def _execute_dimensional_validation(
self,
column: SQALikeColumn,
dimension_col: SQALikeColumn,
metrics_to_compute: dict,
test_params: dict,
) -> List[DimensionResult]:
"""Execute dimensional query with impact scoring and Others aggregation for pandas
Follows the iterate pattern from the Mean metric's df_fn method to handle
multiple dataframes efficiently without concatenating them in memory.
Memory-efficient approach: Instead of concatenating all dataframes (which creates
a full copy in memory), we iterate over them and accumulate aggregates. This is
especially important for large parquet files split across many chunks.
For missing count validation, we accumulate null/missing counts across dataframes
to accurately track how many missing values exist per dimension.
Args:
column: The column being validated
dimension_col: Single SQALikeColumn object corresponding to the dimension column
metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects
test_params: Dictionary with test-specific parameters (MISSING_VALUE_MATCH, MISSING_COUNT_VALUE)
Returns:
List[DimensionResult]: Top N dimensions by impact score plus "Others"
"""
dimension_results = []
try:
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
metric_expressions = {
Metrics.NULL_MISSING_COUNT.name: Metrics.NULL_MISSING_COUNT(
column
).get_pandas_computation(),
Metrics.ROW_COUNT.name: Metrics.ROW_COUNT().get_pandas_computation(),
}
missing_values = test_params.get(self.MISSING_VALUE_MATCH)
missing_values_expected_count = test_params.get(self.MISSING_COUNT_VALUE, 0)
if missing_values:
metric_expressions[Metrics.COUNT_IN_SET.name] = add_props(
values=missing_values
)(Metrics.COUNT_IN_SET.value)(column).get_pandas_computation()
dimension_aggregates = defaultdict(
lambda: {
metric_name: metric.create_accumulator()
for metric_name, metric in metric_expressions.items()
}
)
for df in dfs:
df_typed = cast(pd.DataFrame, df)
grouped = df_typed.groupby(dimension_col.name, dropna=False)
for dimension_value, group_df in grouped:
dimension_value = self.format_dimension_value(dimension_value)
for metric_name, metric in metric_expressions.items():
dimension_aggregates[dimension_value][
metric_name
] = metric.update_accumulator(
dimension_aggregates[dimension_value][metric_name], group_df
)
results_data = []
for dimension_value, agg in dimension_aggregates.items():
total_missing_count = sum(
metric.aggregate_accumulator(agg[metric_name])
for metric_name, metric in metric_expressions.items()
if metric_name != Metrics.ROW_COUNT.name
)
total_rows = metric_expressions[
Metrics.ROW_COUNT.name
].aggregate_accumulator(agg[Metrics.ROW_COUNT.name])
# Calculate initial deviation (will be recalculated for "Others")
deviation = abs(total_missing_count - missing_values_expected_count)
results_data.append(
{
DIMENSION_VALUE_KEY: dimension_value,
self.TOTAL_MISSING_COUNT: total_missing_count,
DIMENSION_TOTAL_COUNT_KEY: total_rows,
DIMENSION_FAILED_COUNT_KEY: deviation,
}
)
results_df = pd.DataFrame(results_data)
if not results_df.empty:
# Define recalculation function for deviation after aggregation
def recalculate_failed_count(df_aggregated, others_mask, metric_column):
"""Recalculate failed_count (deviation) for 'Others' from aggregated total_missing_count"""
result = df_aggregated[metric_column].copy()
if others_mask.any():
others_total = df_aggregated.loc[
others_mask, self.TOTAL_MISSING_COUNT
].iloc[0]
# Deviation is the failed_count
result.loc[others_mask] = abs(
others_total - missing_values_expected_count
)
return result
results_df = calculate_impact_score_pandas(
results_df,
failed_column=DIMENSION_FAILED_COUNT_KEY,
total_column=DIMENSION_TOTAL_COUNT_KEY,
)
results_df = aggregate_others_statistical_pandas(
results_df,
dimension_column=DIMENSION_VALUE_KEY,
top_n=DEFAULT_TOP_DIMENSIONS,
agg_functions={
self.TOTAL_MISSING_COUNT: "sum", # Sum actual missing counts
DIMENSION_TOTAL_COUNT_KEY: "sum",
DIMENSION_FAILED_COUNT_KEY: "sum", # This will be recalculated for Others
},
final_metric_calculators={
DIMENSION_FAILED_COUNT_KEY: recalculate_failed_count, # Recalculate deviation for Others
},
# No violation_predicate needed - deviation IS the failed_count
)
for row_dict in results_df.to_dict("records"):
metric_values = self._build_metric_values_from_row(
row_dict, metrics_to_compute, test_params
)
# Need to add the calculated metric here.
metric_values[self.TOTAL_MISSING_COUNT] = row_dict.get(
self.TOTAL_MISSING_COUNT
)
evaluation = self._evaluate_test_condition(
metric_values, test_params
)
dimension_result = self._create_dimension_result(
row_dict,
dimension_col.name,
metric_values,
evaluation,
test_params,
)
dimension_results.append(dimension_result)
except Exception as exc:
logger.warning(f"Error executing dimensional query: {exc}")
logger.debug("Full error details: ", exc_info=True)
return dimension_results

View File

@ -13,17 +13,34 @@
Validator for column values to be not null test case
"""
from typing import Optional
from collections import defaultdict
from typing import List, Optional, cast
import pandas as pd
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_TOTAL_COUNT_KEY,
DIMENSION_VALUE_KEY,
)
from metadata.data_quality.validations.column.base.columnValuesToBeNotNull import (
BaseColumnValuesToBeNotNullValidator,
)
from metadata.data_quality.validations.impact_score import (
DEFAULT_TOP_DIMENSIONS,
calculate_impact_score_pandas,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
)
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn
logger = test_suite_logger()
class ColumnValuesToBeNotNullValidator(
BaseColumnValuesToBeNotNullValidator, PandasValidatorMixin
@ -39,6 +56,127 @@ class ColumnValuesToBeNotNullValidator(
"""
return self.run_dataframe_results(self.runner, metric, column)
def _execute_dimensional_validation(
self,
column: SQALikeColumn,
dimension_col: SQALikeColumn,
metrics_to_compute: dict,
test_params: dict,
) -> List[DimensionResult]:
"""Execute dimensional query with impact scoring and Others aggregation for pandas
Follows the iterate pattern from the Mean metric's df_fn method to handle
multiple dataframes efficiently without concatenating them in memory.
Memory-efficient approach: Instead of concatenating all dataframes (which creates
a full copy in memory), we iterate over them and accumulate aggregates. This is
especially important for large parquet files split across many chunks.
For not-null validation, we accumulate null counts across dataframes to accurately
track how many null values exist per dimension.
Args:
column: The column being validated
dimension_col: Single SQALikeColumn object corresponding to the dimension column
metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects
test_params: Dictionary with test-specific parameters (empty for this validator)
Returns:
List[DimensionResult]: Top N dimensions by impact score plus "Others"
"""
dimension_results = []
try:
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
null_count_impl = Metrics.NULL_COUNT(column).get_pandas_computation()
row_count_impl = Metrics.ROW_COUNT().get_pandas_computation()
dimension_aggregates = defaultdict(
lambda: {
Metrics.NULL_COUNT.name: null_count_impl.create_accumulator(),
Metrics.ROW_COUNT.name: row_count_impl.create_accumulator(),
}
)
for df in dfs:
df_typed = cast(pd.DataFrame, df)
grouped = df_typed.groupby(dimension_col.name, dropna=False)
for dimension_value, group_df in grouped:
dimension_value = self.format_dimension_value(dimension_value)
dimension_aggregates[dimension_value][
Metrics.NULL_COUNT.name
] = null_count_impl.update_accumulator(
dimension_aggregates[dimension_value][Metrics.NULL_COUNT.name],
group_df,
)
dimension_aggregates[dimension_value][
Metrics.ROW_COUNT.name
] = row_count_impl.update_accumulator(
dimension_aggregates[dimension_value][Metrics.ROW_COUNT.name],
group_df,
)
results_data = []
for dimension_value, agg in dimension_aggregates.items():
null_count = null_count_impl.aggregate_accumulator(
agg[Metrics.NULL_COUNT.name]
)
row_count = row_count_impl.aggregate_accumulator(
agg[Metrics.ROW_COUNT.name]
)
results_data.append(
{
DIMENSION_VALUE_KEY: dimension_value,
Metrics.NULL_COUNT.name: null_count,
Metrics.ROW_COUNT.name: row_count,
DIMENSION_TOTAL_COUNT_KEY: row_count,
DIMENSION_FAILED_COUNT_KEY: null_count,
}
)
results_df = pd.DataFrame(results_data)
if not results_df.empty:
results_df = calculate_impact_score_pandas(
results_df,
failed_column=DIMENSION_FAILED_COUNT_KEY,
total_column=DIMENSION_TOTAL_COUNT_KEY,
)
results_df = aggregate_others_pandas(
results_df,
dimension_column=DIMENSION_VALUE_KEY,
top_n=DEFAULT_TOP_DIMENSIONS,
)
for row_dict in results_df.to_dict("records"):
metric_values = self._build_metric_values_from_row(
row_dict, metrics_to_compute, test_params
)
evaluation = self._evaluate_test_condition(
metric_values, test_params
)
dimension_result = self._create_dimension_result(
row_dict,
dimension_col.name,
metric_values,
evaluation,
test_params,
)
dimension_results.append(dimension_result)
except Exception as exc:
logger.warning(f"Error executing dimensional query: {exc}")
logger.debug("Full error details: ", exc_info=True)
return dimension_results
def compute_row_count(self, column: SQALikeColumn):
"""Compute row count for the given column

View File

@ -13,16 +13,25 @@
Validator for column value missing count to be equal test case
"""
from typing import Optional
from typing import List, Optional
from sqlalchemy import Column
from sqlalchemy import Column, case, func
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_OTHERS_LABEL,
DIMENSION_TOTAL_COUNT_KEY,
)
from metadata.data_quality.validations.column.base.columnValuesMissingCount import (
BaseColumnValuesMissingCountValidator,
)
from metadata.data_quality.validations.impact_score import DEFAULT_TOP_DIMENSIONS
from metadata.data_quality.validations.mixins.sqa_validator_mixin import (
DIMENSION_GROUP_LABEL,
SQAValidatorMixin,
)
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
@ -42,3 +51,116 @@ class ColumnValuesMissingCountValidator(
column: column
"""
return self.run_query_results(self.runner, metric, column, **kwargs)
def _execute_dimensional_validation(
self,
column: Column,
dimension_col: Column,
metrics_to_compute: dict,
test_params: dict,
) -> List[DimensionResult]:
"""Execute dimensional validation for missing count with deviation recalculation
Uses statistical aggregation to:
1. Compute total_missing_count per dimension (NULL + optional custom values)
2. Calculate deviation = abs(actual - expected) per dimension
3. Aggregate "Others" and recalculate deviation from summed missing counts
Args:
column: The column being validated
dimension_col: The dimension column to group by
metrics_to_compute: Dict mapping metric names to Metrics enums
test_params: Test parameters (MISSING_VALUE_MATCH, MISSING_COUNT_VALUE)
Returns:
List[DimensionResult]: Top N dimensions plus "Others"
"""
dimension_results = []
try:
missing_values = test_params.get(self.MISSING_VALUE_MATCH)
expected_missing_count = test_params.get(self.MISSING_COUNT_VALUE, 0)
total_missing_expr = Metrics.NULL_MISSING_COUNT(column).fn()
if missing_values:
total_missing_expr = (
total_missing_expr
+ add_props(values=missing_values)(Metrics.COUNT_IN_SET.value)(
column
).fn()
)
metric_expressions = {
self.TOTAL_MISSING_COUNT: total_missing_expr,
DIMENSION_TOTAL_COUNT_KEY: Metrics.ROW_COUNT().fn(),
}
def build_failed_count(cte):
"""Build failed count - deviation IS the failed count
For initial CTE: deviation = abs(actual - expected)
"""
return func.abs(
getattr(cte.c, self.TOTAL_MISSING_COUNT) - expected_missing_count
)
def build_deviation_final(cte):
"""Recalculate deviation for Others after aggregation
For top dimensions: keep original deviation via max()
For Others: recalculate from aggregated total_missing_count
"""
return case(
[
(
getattr(cte.c, DIMENSION_GROUP_LABEL)
!= DIMENSION_OTHERS_LABEL,
func.max(getattr(cte.c, DIMENSION_FAILED_COUNT_KEY)),
)
],
else_=(
func.abs(
func.sum(getattr(cte.c, self.TOTAL_MISSING_COUNT))
- expected_missing_count
)
),
)
result_rows = self._execute_with_others_aggregation_statistical(
dimension_col,
metric_expressions,
build_failed_count,
final_metric_builders={
DIMENSION_FAILED_COUNT_KEY: build_deviation_final
},
top_dimensions_count=DEFAULT_TOP_DIMENSIONS,
)
for row in result_rows:
total_missing_count = row.get(self.TOTAL_MISSING_COUNT)
if total_missing_count is None:
continue
metric_values = {
self.TOTAL_MISSING_COUNT: total_missing_count,
}
evaluation = self._evaluate_test_condition(metric_values, test_params)
dimension_result = self._create_dimension_result(
row,
dimension_col.name,
metric_values,
evaluation,
test_params,
)
dimension_results.append(dimension_result)
except Exception as exc:
logger.warning(f"Error executing dimensional query: {exc}")
logger.debug("Full error details: ", exc_info=True)
return dimension_results

View File

@ -13,16 +13,22 @@
Validator for column values to be not null test case
"""
from typing import Optional
from typing import List, Optional
from sqlalchemy import Column
from metadata.data_quality.validations.base_test_handler import (
DIMENSION_FAILED_COUNT_KEY,
DIMENSION_TOTAL_COUNT_KEY,
)
from metadata.data_quality.validations.column.base.columnValuesToBeNotNull import (
BaseColumnValuesToBeNotNullValidator,
)
from metadata.data_quality.validations.impact_score import DEFAULT_TOP_DIMENSIONS
from metadata.data_quality.validations.mixins.sqa_validator_mixin import (
SQAValidatorMixin,
)
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
@ -43,6 +49,68 @@ class ColumnValuesToBeNotNullValidator(
"""
return self.run_query_results(self.runner, metric, column)
def _execute_dimensional_validation(
self,
column: Column,
dimension_col: Column,
metrics_to_compute: dict,
test_params: dict,
) -> List[DimensionResult]:
"""Execute dimensional query with impact scoring and Others aggregation
Calculates impact scores for all dimension values and aggregates
low-impact dimensions into "Others" category using CTEs.
Args:
column: The column being validated
dimension_col: Single Column object corresponding to the dimension column
metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects
test_params: Dictionary with test-specific parameters (allowed_values, match_enum)
Returns:
List[DimensionResult]: Top N dimensions by impact score plus "Others"
"""
dimension_results = []
try:
# Build metric expressions using enum names as keys
metric_expressions = {}
for metric_name, metric in metrics_to_compute.items():
metric_instance = metric.value(column)
metric_expressions[metric_name] = metric_instance.fn()
metric_expressions[DIMENSION_TOTAL_COUNT_KEY] = Metrics.ROW_COUNT().fn()
metric_expressions[DIMENSION_FAILED_COUNT_KEY] = metric_expressions[
Metrics.NULL_COUNT.name
]
result_rows = self._execute_with_others_aggregation(
dimension_col, metric_expressions, DEFAULT_TOP_DIMENSIONS
)
for row in result_rows:
# Build metric_values dict using helper method
metric_values = self._build_metric_values_from_row(
row, metrics_to_compute, test_params
)
# Evaluate test condition
evaluation = self._evaluate_test_condition(metric_values, test_params)
# Create dimension result using helper method
dimension_result = self._create_dimension_result(
row, dimension_col.name, metric_values, evaluation, test_params
)
dimension_results.append(dimension_result)
except Exception as exc:
logger.warning(f"Error executing dimensional query: {exc}")
logger.debug("Full error details: ", exc_info=True)
return dimension_results
def compute_row_count(self, column: Column):
"""Compute row count for the given column

View File

@ -14,13 +14,21 @@ Null Count Metric definition
"""
# pylint: disable=duplicate-code
from typing import TYPE_CHECKING
from sqlalchemy import case, column
from sqlalchemy.sql.functions import coalesce
from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation
from metadata.profiler.orm.functions.sum import SumFn
from metadata.utils.logger import profiler_logger
if TYPE_CHECKING:
import pandas as pd
logger = profiler_logger()
class NullCount(StaticMetric):
@ -56,4 +64,34 @@ class NullCount(StaticMetric):
def df_fn(self, dfs=None):
"""pandas function"""
return sum(df[self.col.name].isnull().sum() for df in dfs)
computation = self.get_pandas_computation()
accumulator = computation.create_accumulator()
for df in dfs:
try:
accumulator = computation.update_accumulator(accumulator, df)
except Exception as err:
logger.debug(
f"Error while computing 'Null Count' for column '{self.col.name}': {err}"
)
return None
return computation.aggregate_accumulator(accumulator)
def get_pandas_computation(self) -> PandasComputation:
"""Returns the logic to compute this metrics using Pandas"""
return PandasComputation[int, int](
create_accumulator=lambda: 0,
update_accumulator=lambda acc, df: NullCount.update_accumulator(
acc, df, self.col
),
aggregate_accumulator=lambda acc: acc,
)
@staticmethod
def update_accumulator(current_count: int, df: "pd.DataFrame", column) -> int:
"""Computes one DataFrame chunk and updates the running null count
Maintains a single count value. Adds chunk's null count to the
current total and returns the sum.
"""
chunk_null_count = df[column.name].isnull().sum()
return current_count + chunk_null_count

View File

@ -14,12 +14,20 @@ Null Count Metric definition
"""
# pylint: disable=duplicate-code
from typing import TYPE_CHECKING
from sqlalchemy import case, column
from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation
from metadata.profiler.orm.functions.sum import SumFn
from metadata.utils.logger import profiler_logger
if TYPE_CHECKING:
import pandas as pd
logger = profiler_logger()
class NullMissingCount(StaticMetric):
@ -67,7 +75,36 @@ class NullMissingCount(StaticMetric):
)
def df_fn(self, dfs=None):
"""pandas function"""
computation = self.get_pandas_computation()
accumulator = computation.create_accumulator()
for df in dfs:
try:
accumulator = computation.update_accumulator(accumulator, df)
except Exception as err:
logger.debug(
f"Error while computing 'Null Missing Count' for column '{self.col.name}': {err}"
)
return None
return computation.aggregate_accumulator(accumulator)
def get_pandas_computation(self) -> PandasComputation:
"""Returns the logic to compute this metrics using Pandas"""
return PandasComputation[int, int](
create_accumulator=lambda: 0,
update_accumulator=lambda acc, df: NullMissingCount.update_accumulator(
acc, df, self.col
),
aggregate_accumulator=lambda acc: acc,
)
@staticmethod
def update_accumulator(current_count: int, df: "pd.DataFrame", column) -> int:
"""Computes one DataFrame chunk and updates the running null/missing count
Maintains a single count value. Adds chunk's null and empty string count
to the current total and returns the sum.
"""
Returns the pandas function for calculating the metric.
"""
return sum(df[self.col.name].isnull().sum() for df in dfs)
chunk_null_count = df[column.name].isnull().sum()
chunk_empty_count = (df[column.name] == "").sum()
return current_count + chunk_null_count + chunk_empty_count

View File

@ -386,7 +386,7 @@ class TestBaseTestValidator:
validator._run_dimensional_validation.assert_called_once()
def test_get_test_parameters_default_returns_none():
def test_get_test_parameters_default_returns_empty_dict():
"""Test that default _get_test_parameters implementation returns None"""
test_case = MagicMock(spec=TestCase)
test_case.name = "test_default_params"
@ -400,7 +400,7 @@ def test_get_test_parameters_default_returns_none():
result = validator._get_test_parameters()
assert result is None
assert result == {}
def test_evaluate_test_condition_not_implemented_error():

View File

@ -1125,3 +1125,49 @@ def test_case_column_values_to_not_match_regex_dimensional():
dimensionColumns=["age"],
computePassedFailedRowCount=True,
) # type: ignore
@pytest.fixture
def test_case_column_values_to_be_not_null_dimensional():
"""Test case for test column_values_to_be_not_null with dimensional analysis"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_AGE,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
dimensionColumns=["name"],
computePassedFailedRowCount=True,
) # type: ignore
@pytest.fixture
def test_case_column_values_missing_count_to_be_equal_dimensional():
"""Test case for test column_values_missing_count with dimensional analysis"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="missingCountValue", value="2000"),
],
dimensionColumns=["name"],
computePassedFailedRowCount=True,
) # type: ignore
@pytest.fixture
def test_case_column_values_missing_count_to_be_equal_missing_values_dimensional():
"""Test case for test column_values_missing_count with custom missing values and dimensional analysis"""
return TestCase(
name=TEST_CASE_NAME,
entityLink=ENTITY_LINK_NICKNAME,
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
parameterValues=[
TestCaseParameterValue(name="missingCountValue", value="2000"),
TestCaseParameterValue(name="missingValueMatch", value="['Johnny d']"),
],
dimensionColumns=["name"],
computePassedFailedRowCount=True,
) # type: ignore

View File

@ -864,6 +864,75 @@ TEST_CASE_SUPPORT_ROW_LEVEL_PASS_FAILED = {
("age=31", TestCaseStatus.Success, 30, 0, 100, 0, 0),
],
),
(
"test_case_column_values_to_be_not_null_dimensional",
"columnValuesToBeNotNull",
"COLUMN",
(
TestCaseResult,
"20",
None,
TestCaseStatus.Failed,
60.0,
20.0,
75.0,
25.0,
),
[
("name=Eve", TestCaseStatus.Failed, 0, 10, 0, 100, 0.0333),
("name=John", TestCaseStatus.Failed, 10, 10, 50, 50, 0.0167),
("name=Alice", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 10, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 20, 0, 100, 0, 0),
],
),
(
"test_case_column_values_missing_count_to_be_equal_dimensional",
"columnValuesMissingCount",
"COLUMN",
(
TestCaseResult,
"20",
None,
TestCaseStatus.Failed,
None,
None,
None,
None,
),
[
("name=Alice", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Bob", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Diana", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Eve", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Others", TestCaseStatus.Failed, None, None, None, None, 1),
],
),
(
"test_case_column_values_missing_count_to_be_equal_missing_values_dimensional",
"columnValuesMissingCount",
"COLUMN",
(
TestCaseResult,
"30",
None,
TestCaseStatus.Failed,
None,
None,
None,
None,
),
[
("name=Alice", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Bob", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Diana", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Eve", TestCaseStatus.Failed, None, None, None, None, 1),
("name=Others", TestCaseStatus.Failed, None, None, None, None, 1),
],
),
],
)
def test_suite_validation_database(

View File

@ -1062,6 +1062,75 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
("age=31.0", TestCaseStatus.Success, 6000, 0, 100, 0, 0),
],
),
(
"test_case_column_values_to_be_not_null_dimensional",
"columnValuesToBeNotNull",
"COLUMN",
(
TestCaseResult,
"4000",
None,
TestCaseStatus.Failed,
12000.0,
4000.0,
75.0,
25.0,
),
[
("name=Eve", TestCaseStatus.Failed, 0, 2000, 0, 100, 0.6667),
("name=John", TestCaseStatus.Failed, 2000, 2000, 50, 50, 0.1667),
("name=Alice", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Bob", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Charlie", TestCaseStatus.Success, 2000, 0, 100, 0, 0),
("name=Others", TestCaseStatus.Success, 4000, 0, 100, 0, 0),
],
),
(
"test_case_column_values_missing_count_to_be_equal_dimensional",
"columnValuesMissingCount",
"COLUMN",
(
TestCaseResult,
"2000",
None,
TestCaseStatus.Success,
None,
None,
None,
None,
),
[
("name=Alice", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Bob", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Diana", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Eve", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Others", TestCaseStatus.Success, None, None, None, None, 0),
],
),
(
"test_case_column_values_missing_count_to_be_equal_missing_values_dimensional",
"columnValuesMissingCount",
"COLUMN",
(
TestCaseResult,
"4000",
None,
TestCaseStatus.Failed,
None,
None,
None,
None,
),
[
("name=Alice", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Bob", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Diana", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Eve", TestCaseStatus.Failed, None, None, None, None, 0.6667),
("name=Others", TestCaseStatus.Failed, None, None, None, None, 0.0741),
],
),
],
)
def test_suite_validation_datalake(