diff --git a/ingestion/src/metadata/data_quality/validations/base_test_handler.py b/ingestion/src/metadata/data_quality/validations/base_test_handler.py index 70bb78f3e56..e570ee5aa3d 100644 --- a/ingestion/src/metadata/data_quality/validations/base_test_handler.py +++ b/ingestion/src/metadata/data_quality/validations/base_test_handler.py @@ -247,16 +247,16 @@ class BaseTestValidator(ABC): logger.debug(traceback.format_exc()) return [] - def _get_test_parameters(self) -> Optional[dict]: + def _get_test_parameters(self) -> dict: """Get test-specific parameters from test case - Default implementation returns None. Override in child classes + Default implementation returns empty dict. Override in child classes that need to extract and process test parameters. Returns: - Optional[dict]: Test parameters, or None if validator has no parameters. + dict: Test parameters, or empty dict if validator has no parameters. """ - return None + return {} def _get_metrics_to_compute(self, test_params: Optional[dict] = None) -> dict: """Get metrics that need to be computed for this test @@ -450,20 +450,12 @@ class BaseTestValidator(ABC): test_result_values = self._get_test_result_values(metric_values) impact_score = row.get(DIMENSION_IMPACT_SCORE_KEY, 0.0) - # Get total_rows from evaluation if present, otherwise from metric_values - # Import here to avoid circular import (metrics.registry -> utils.importer -> base_test_handler) - from metadata.profiler.metrics.registry import Metrics - - total_rows = evaluation.get( - "total_rows", metric_values.get(Metrics.ROW_COUNT.name) - ) - return self.get_dimension_result_object( dimension_values={dimension_col_name: dimension_value}, test_case_status=self.get_test_case_status(evaluation["matched"]), result=result_message, test_result_value=test_result_values, - total_rows=total_rows, + total_rows=evaluation["total_rows"], passed_rows=evaluation["passed_rows"], failed_rows=evaluation["failed_rows"], impact_score=impact_score, diff --git a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesMissingCount.py b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesMissingCount.py index 1e4ae4cec2c..722f2cf92a1 100644 --- a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesMissingCount.py +++ b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesMissingCount.py @@ -16,11 +16,15 @@ Validator for column value missing count to be equal test case import traceback from abc import abstractmethod from ast import literal_eval -from typing import Union +from typing import List, Optional, Union from sqlalchemy import Column -from metadata.data_quality.validations.base_test_handler import BaseTestValidator +from metadata.data_quality.validations.base_test_handler import ( + BaseTestValidator, + DimensionInfo, + TestEvaluation, +) from metadata.generated.schema.tests.basic import ( TestCaseResult, TestCaseStatus, @@ -38,6 +42,10 @@ NULL_COUNT = "nullCount" class BaseColumnValuesMissingCountValidator(BaseTestValidator): """Validator for column value missing count to be equal test case""" + MISSING_VALUE_MATCH = "missingValueMatch" + MISSING_COUNT_VALUE = "missingCountValue" + TOTAL_MISSING_COUNT = "total_missing_count" + def _run_validation(self) -> TestCaseResult: """Execute the specific test validation logic @@ -47,12 +55,34 @@ class BaseColumnValuesMissingCountValidator(BaseTestValidator): Returns: TestCaseResult: The test case result for the overall validation """ + test_params = self._get_test_parameters() + try: column: Union[SQALikeColumn, Column] = self.get_column() - null_res = self._run_results( + null_missing_count = self._run_results( Metrics.NULL_MISSING_COUNT, column, ) + + metric_values = { + Metrics.NULL_MISSING_COUNT.name: null_missing_count, + } + + if test_params.get(self.MISSING_VALUE_MATCH): + # if user supplies missing values, we need to compute the count of missing values + # in addition to the count of null values + count_in_set = self._run_results( + Metrics.COUNT_IN_SET, + column, + values=test_params[self.MISSING_VALUE_MATCH], + ) + metric_values[Metrics.COUNT_IN_SET.name] = count_in_set + metric_values[self.TOTAL_MISSING_COUNT] = ( + null_missing_count + count_in_set + ) + else: + metric_values[self.TOTAL_MISSING_COUNT] = null_missing_count + except (ValueError, RuntimeError) as exc: msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore logger.debug(traceback.format_exc()) @@ -64,45 +94,143 @@ class BaseColumnValuesMissingCountValidator(BaseTestValidator): [TestResultValue(name=NULL_COUNT, value=None)], ) + evaluation = self._evaluate_test_condition(metric_values, test_params) + result_message = self._format_result_message( + metric_values, test_params=test_params + ) + test_result_values = self._get_test_result_values(metric_values) + + return self.get_test_case_result_object( + self.execution_date, + self.get_test_case_status(evaluation["matched"]), + result_message, + test_result_values, + ) + + def _get_test_parameters(self) -> dict: + """Extract test-specific parameters from test case + + Returns: + dict with keys: MISSING_VALUE_MATCH, MISSING_COUNT_VALUE + """ missing_values = self.get_test_case_param_value( self.test_case.parameterValues, # type: ignore - "missingValueMatch", + self.MISSING_VALUE_MATCH, literal_eval, ) missing_count_value = self.get_test_case_param_value( self.test_case.parameterValues, # type: ignore - "missingCountValue", + self.MISSING_COUNT_VALUE, literal_eval, ) - if missing_values: - # if user supplies missing values, we need to compute the count of missing values - # in addition to the count of null values - try: - set_res = self._run_results( - Metrics.COUNT_IN_SET, column, values=missing_values - ) - null_res += set_res - except (ValueError, RuntimeError) as exc: - msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore - logger.debug(traceback.format_exc()) - logger.warning(msg) - return self.get_test_case_result_object( - self.execution_date, - TestCaseStatus.Aborted, - msg, - [ - TestResultValue(name=NULL_COUNT, value=None), - ], - ) + return { + self.MISSING_VALUE_MATCH: missing_values, + self.MISSING_COUNT_VALUE: missing_count_value, + } - return self.get_test_case_result_object( - self.execution_date, - self.get_test_case_status(missing_count_value == null_res), - f"Found nullCount={null_res} vs. the expected nullCount={missing_count_value}.", - [TestResultValue(name=NULL_COUNT, value=str(null_res))], - ) + def _get_metrics_to_compute(self, test_params: dict) -> dict: + """Define which metrics to compute based on test parameters + + Args: + test_params: Dictionary with MISSING_VALUE_MATCH and MISSING_COUNT_VALUE + + Returns: + dict: Mapping of Metrics enum names to Metrics enum values + """ + metrics = { + Metrics.NULL_MISSING_COUNT.name: Metrics.NULL_MISSING_COUNT, + } + + if test_params.get(self.MISSING_VALUE_MATCH): + metrics[Metrics.COUNT_IN_SET.name] = Metrics.COUNT_IN_SET + + return metrics + + def _evaluate_test_condition( + self, metric_values: dict, test_params: Optional[dict] = None + ) -> TestEvaluation: + """Evaluate the missing count test condition + + Test passes if total_missing_count == expected missing_count_value + + Args: + metric_values: Dictionary with keys from Metrics enum names + e.g., {"NULL_MISSING_COUNT": 5, "COUNT_IN_SET": 3} + test_params: Dictionary with MISSING_VALUE_MATCH and MISSING_COUNT_VALUE + + Returns: + TestEvaluation: TypedDict with keys: + - matched: bool - whether test passed + - passed_rows: None - not computed for this validator + - failed_rows: None - not computed for this validator + - total_rows: None - not computed for this validator + """ + if test_params is None: + raise ValueError( + "test_params is required for columnValuesMissingCount._evaluate_test_condition" + ) + + total_missing_count = metric_values[self.TOTAL_MISSING_COUNT] + expected_missing_count = test_params[self.MISSING_COUNT_VALUE] + + matched = total_missing_count == expected_missing_count + + return { + "matched": matched, + "passed_rows": None, + "failed_rows": None, + "total_rows": None, + } + + def _format_result_message( + self, + metric_values: dict, + dimension_info: Optional[DimensionInfo] = None, + test_params: Optional[dict] = None, + ) -> str: + """Format the result message for missing count test + + Args: + metric_values: Dictionary with Metrics enum names as keys + dimension_info: Optional DimensionInfo with dimension details + test_params: Optional test parameters with expected MISSING_COUNT_VALUE + + Returns: + str: Formatted result message + """ + if test_params is None: + raise ValueError( + "test_params is required for columnValuesMissingCount._format_result_message" + ) + + total_missing_count = metric_values[self.TOTAL_MISSING_COUNT] + expected_missing_count = test_params[self.MISSING_COUNT_VALUE] + + if dimension_info: + return ( + f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: " + f"Found nullCount={total_missing_count} vs. the expected nullCount={expected_missing_count}." + ) + else: + return f"Found nullCount={total_missing_count} vs. the expected nullCount={expected_missing_count}." + + def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]: + """Get test result values for missing count test + + Args: + metric_values: Dictionary with Metrics enum names as keys + + Returns: + List[TestResultValue]: Test result values for the test case + """ + return [ + TestResultValue( + name=NULL_COUNT, + value=str(metric_values[self.TOTAL_MISSING_COUNT]), + ), + ] @abstractmethod def _run_results( diff --git a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeNotNull.py b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeNotNull.py index 4dc3b434b94..d43143bb75d 100644 --- a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeNotNull.py +++ b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeNotNull.py @@ -15,11 +15,15 @@ Validator for column values to be not null test case import traceback from abc import abstractmethod -from typing import Union +from typing import List, Optional, Union from sqlalchemy import Column -from metadata.data_quality.validations.base_test_handler import BaseTestValidator +from metadata.data_quality.validations.base_test_handler import ( + BaseTestValidator, + DimensionInfo, + TestEvaluation, +) from metadata.generated.schema.tests.basic import ( TestCaseResult, TestCaseStatus, @@ -46,9 +50,18 @@ class BaseColumnValuesToBeNotNullValidator(BaseTestValidator): Returns: TestCaseResult: The test case result for the overall validation """ + test_params = self._get_test_parameters() + try: column: Union[SQALikeColumn, Column] = self.get_column() - res = self._run_results(Metrics.NULL_COUNT, column) + null_count = self._run_results(Metrics.NULL_COUNT, column) + + metric_values = { + Metrics.NULL_COUNT.name: null_count, + } + + if self.test_case.computePassedFailedRowCount: + metric_values[Metrics.ROW_COUNT.name] = self.get_row_count() except (ValueError, RuntimeError) as exc: msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore logger.debug(traceback.format_exc()) @@ -60,20 +73,115 @@ class BaseColumnValuesToBeNotNullValidator(BaseTestValidator): [TestResultValue(name=NULL_COUNT, value=None)], ) - if self.test_case.computePassedFailedRowCount: - row_count = self.get_row_count() - else: - row_count = None + evaluation = self._evaluate_test_condition(metric_values, test_params) + result_message = self._format_result_message( + metric_values, test_params=test_params + ) + test_result_values = self._get_test_result_values(metric_values) return self.get_test_case_result_object( self.execution_date, - self.get_test_case_status(res == 0), - f"Found nullCount={res}. It should be 0", - [TestResultValue(name=NULL_COUNT, value=str(res))], - row_count=row_count, - failed_rows=res, + self.get_test_case_status(evaluation["matched"]), + result_message, + test_result_values, + row_count=evaluation["total_rows"], + passed_rows=evaluation["passed_rows"], + failed_rows=evaluation["failed_rows"], ) + def _get_metrics_to_compute(self, test_params: dict) -> dict: + """Define which metrics to compute based on test parameters + + Args: + test_params: Dictionary (empty for this validator) + + Returns: + dict: Mapping of Metrics enum names to Metrics enum values + """ + metrics = { + Metrics.NULL_COUNT.name: Metrics.NULL_COUNT, + } + + if self.test_case.computePassedFailedRowCount: + metrics[Metrics.ROW_COUNT.name] = Metrics.ROW_COUNT + + return metrics + + def _evaluate_test_condition( + self, metric_values: dict, test_params: Optional[dict] = None + ) -> TestEvaluation: + """Evaluate the not null test condition + + Test passes if null_count == 0 (no null values found) + + Args: + metric_values: Dictionary with keys from Metrics enum names + e.g., {"NULL_COUNT": 0, "ROW_COUNT": 100} + test_params: Dictionary (not used for this validator) + + Returns: + TestEvaluation: TypedDict with keys: + - matched: bool - whether test passed (null_count == 0) + - passed_rows: int - number of non-null values + - failed_rows: int - number of null values + - total_rows: int - total row count for reporting + """ + null_count = metric_values[Metrics.NULL_COUNT.name] + total_rows = metric_values.get(Metrics.ROW_COUNT.name) + + matched = null_count == 0 + failed_count = null_count + passed_count = total_rows - null_count if total_rows else 0 + + return { + "matched": matched, + "passed_rows": passed_count, + "failed_rows": failed_count, + "total_rows": total_rows, + } + + def _format_result_message( + self, + metric_values: dict, + dimension_info: Optional[DimensionInfo] = None, + test_params: Optional[dict] = None, + ) -> str: + """Format the result message for not null test + + Args: + metric_values: Dictionary with Metrics enum names as keys + dimension_info: Optional DimensionInfo with dimension details + test_params: Optional test parameters (not used by this validator) + + Returns: + str: Formatted result message + """ + null_count = metric_values[Metrics.NULL_COUNT.name] + + if dimension_info: + return ( + f"Dimension {dimension_info['dimension_name']}={dimension_info['dimension_value']}: " + f"Found nullCount={null_count}. It should be 0" + ) + else: + return f"Found nullCount={null_count}. It should be 0" + + def _get_test_result_values(self, metric_values: dict) -> List[TestResultValue]: + """Get test result values for not null test + + Args: + metric_values: Dictionary with Metrics enum names as keys + + Returns: + List[TestResultValue]: Test result values for the test case + """ + return [ + TestResultValue( + name=NULL_COUNT, + value=str(metric_values[Metrics.NULL_COUNT.name]), + ), + ] + @abstractmethod def _run_results(self, metric: Metrics, column: Union[SQALikeColumn, Column]): raise NotImplementedError diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesMissingCount.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesMissingCount.py index 729673b9ff0..5f939a7bc69 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesMissingCount.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesMissingCount.py @@ -12,18 +12,35 @@ """ Validator for column value missing count to be equal test case """ +from collections import defaultdict +from typing import List, Optional, cast -from typing import Optional +import pandas as pd +from metadata.data_quality.validations.base_test_handler import ( + DIMENSION_FAILED_COUNT_KEY, + DIMENSION_TOTAL_COUNT_KEY, + DIMENSION_VALUE_KEY, +) from metadata.data_quality.validations.column.base.columnValuesMissingCount import ( BaseColumnValuesMissingCountValidator, ) +from metadata.data_quality.validations.impact_score import ( + DEFAULT_TOP_DIMENSIONS, + calculate_impact_score_pandas, +) from metadata.data_quality.validations.mixins.pandas_validator_mixin import ( PandasValidatorMixin, + aggregate_others_statistical_pandas, ) +from metadata.generated.schema.tests.dimensionResult import DimensionResult +from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.registry import Metrics +from metadata.utils.logger import test_suite_logger from metadata.utils.sqa_like_column import SQALikeColumn +logger = test_suite_logger() + class ColumnValuesMissingCountValidator( BaseColumnValuesMissingCountValidator, PandasValidatorMixin @@ -40,3 +57,163 @@ class ColumnValuesMissingCountValidator( column: column """ return self.run_dataframe_results(self.runner, metric, column, **kwargs) + + def _execute_dimensional_validation( + self, + column: SQALikeColumn, + dimension_col: SQALikeColumn, + metrics_to_compute: dict, + test_params: dict, + ) -> List[DimensionResult]: + """Execute dimensional query with impact scoring and Others aggregation for pandas + + Follows the iterate pattern from the Mean metric's df_fn method to handle + multiple dataframes efficiently without concatenating them in memory. + + Memory-efficient approach: Instead of concatenating all dataframes (which creates + a full copy in memory), we iterate over them and accumulate aggregates. This is + especially important for large parquet files split across many chunks. + + For missing count validation, we accumulate null/missing counts across dataframes + to accurately track how many missing values exist per dimension. + + Args: + column: The column being validated + dimension_col: Single SQALikeColumn object corresponding to the dimension column + metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects + test_params: Dictionary with test-specific parameters (MISSING_VALUE_MATCH, MISSING_COUNT_VALUE) + + Returns: + List[DimensionResult]: Top N dimensions by impact score plus "Others" + """ + dimension_results = [] + + try: + dfs = self.runner if isinstance(self.runner, list) else [self.runner] + + metric_expressions = { + Metrics.NULL_MISSING_COUNT.name: Metrics.NULL_MISSING_COUNT( + column + ).get_pandas_computation(), + Metrics.ROW_COUNT.name: Metrics.ROW_COUNT().get_pandas_computation(), + } + + missing_values = test_params.get(self.MISSING_VALUE_MATCH) + missing_values_expected_count = test_params.get(self.MISSING_COUNT_VALUE, 0) + + if missing_values: + metric_expressions[Metrics.COUNT_IN_SET.name] = add_props( + values=missing_values + )(Metrics.COUNT_IN_SET.value)(column).get_pandas_computation() + + dimension_aggregates = defaultdict( + lambda: { + metric_name: metric.create_accumulator() + for metric_name, metric in metric_expressions.items() + } + ) + + for df in dfs: + df_typed = cast(pd.DataFrame, df) + grouped = df_typed.groupby(dimension_col.name, dropna=False) + + for dimension_value, group_df in grouped: + dimension_value = self.format_dimension_value(dimension_value) + for metric_name, metric in metric_expressions.items(): + dimension_aggregates[dimension_value][ + metric_name + ] = metric.update_accumulator( + dimension_aggregates[dimension_value][metric_name], group_df + ) + + results_data = [] + + for dimension_value, agg in dimension_aggregates.items(): + total_missing_count = sum( + metric.aggregate_accumulator(agg[metric_name]) + for metric_name, metric in metric_expressions.items() + if metric_name != Metrics.ROW_COUNT.name + ) + total_rows = metric_expressions[ + Metrics.ROW_COUNT.name + ].aggregate_accumulator(agg[Metrics.ROW_COUNT.name]) + + # Calculate initial deviation (will be recalculated for "Others") + deviation = abs(total_missing_count - missing_values_expected_count) + + results_data.append( + { + DIMENSION_VALUE_KEY: dimension_value, + self.TOTAL_MISSING_COUNT: total_missing_count, + DIMENSION_TOTAL_COUNT_KEY: total_rows, + DIMENSION_FAILED_COUNT_KEY: deviation, + } + ) + + results_df = pd.DataFrame(results_data) + + if not results_df.empty: + # Define recalculation function for deviation after aggregation + def recalculate_failed_count(df_aggregated, others_mask, metric_column): + """Recalculate failed_count (deviation) for 'Others' from aggregated total_missing_count""" + result = df_aggregated[metric_column].copy() + if others_mask.any(): + others_total = df_aggregated.loc[ + others_mask, self.TOTAL_MISSING_COUNT + ].iloc[0] + # Deviation is the failed_count + result.loc[others_mask] = abs( + others_total - missing_values_expected_count + ) + return result + + results_df = calculate_impact_score_pandas( + results_df, + failed_column=DIMENSION_FAILED_COUNT_KEY, + total_column=DIMENSION_TOTAL_COUNT_KEY, + ) + + results_df = aggregate_others_statistical_pandas( + results_df, + dimension_column=DIMENSION_VALUE_KEY, + top_n=DEFAULT_TOP_DIMENSIONS, + agg_functions={ + self.TOTAL_MISSING_COUNT: "sum", # Sum actual missing counts + DIMENSION_TOTAL_COUNT_KEY: "sum", + DIMENSION_FAILED_COUNT_KEY: "sum", # This will be recalculated for Others + }, + final_metric_calculators={ + DIMENSION_FAILED_COUNT_KEY: recalculate_failed_count, # Recalculate deviation for Others + }, + # No violation_predicate needed - deviation IS the failed_count + ) + + for row_dict in results_df.to_dict("records"): + metric_values = self._build_metric_values_from_row( + row_dict, metrics_to_compute, test_params + ) + + # Need to add the calculated metric here. + metric_values[self.TOTAL_MISSING_COUNT] = row_dict.get( + self.TOTAL_MISSING_COUNT + ) + + evaluation = self._evaluate_test_condition( + metric_values, test_params + ) + + dimension_result = self._create_dimension_result( + row_dict, + dimension_col.name, + metric_values, + evaluation, + test_params, + ) + + dimension_results.append(dimension_result) + + except Exception as exc: + logger.warning(f"Error executing dimensional query: {exc}") + logger.debug("Full error details: ", exc_info=True) + + return dimension_results diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeNotNull.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeNotNull.py index 4f1a6d794e9..63c8a88d6e3 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeNotNull.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeNotNull.py @@ -13,17 +13,34 @@ Validator for column values to be not null test case """ -from typing import Optional +from collections import defaultdict +from typing import List, Optional, cast +import pandas as pd + +from metadata.data_quality.validations.base_test_handler import ( + DIMENSION_FAILED_COUNT_KEY, + DIMENSION_TOTAL_COUNT_KEY, + DIMENSION_VALUE_KEY, +) from metadata.data_quality.validations.column.base.columnValuesToBeNotNull import ( BaseColumnValuesToBeNotNullValidator, ) +from metadata.data_quality.validations.impact_score import ( + DEFAULT_TOP_DIMENSIONS, + calculate_impact_score_pandas, +) from metadata.data_quality.validations.mixins.pandas_validator_mixin import ( PandasValidatorMixin, + aggregate_others_pandas, ) +from metadata.generated.schema.tests.dimensionResult import DimensionResult from metadata.profiler.metrics.registry import Metrics +from metadata.utils.logger import test_suite_logger from metadata.utils.sqa_like_column import SQALikeColumn +logger = test_suite_logger() + class ColumnValuesToBeNotNullValidator( BaseColumnValuesToBeNotNullValidator, PandasValidatorMixin @@ -39,6 +56,127 @@ class ColumnValuesToBeNotNullValidator( """ return self.run_dataframe_results(self.runner, metric, column) + def _execute_dimensional_validation( + self, + column: SQALikeColumn, + dimension_col: SQALikeColumn, + metrics_to_compute: dict, + test_params: dict, + ) -> List[DimensionResult]: + """Execute dimensional query with impact scoring and Others aggregation for pandas + + Follows the iterate pattern from the Mean metric's df_fn method to handle + multiple dataframes efficiently without concatenating them in memory. + + Memory-efficient approach: Instead of concatenating all dataframes (which creates + a full copy in memory), we iterate over them and accumulate aggregates. This is + especially important for large parquet files split across many chunks. + + For not-null validation, we accumulate null counts across dataframes to accurately + track how many null values exist per dimension. + + Args: + column: The column being validated + dimension_col: Single SQALikeColumn object corresponding to the dimension column + metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects + test_params: Dictionary with test-specific parameters (empty for this validator) + + Returns: + List[DimensionResult]: Top N dimensions by impact score plus "Others" + """ + dimension_results = [] + + try: + dfs = self.runner if isinstance(self.runner, list) else [self.runner] + null_count_impl = Metrics.NULL_COUNT(column).get_pandas_computation() + row_count_impl = Metrics.ROW_COUNT().get_pandas_computation() + + dimension_aggregates = defaultdict( + lambda: { + Metrics.NULL_COUNT.name: null_count_impl.create_accumulator(), + Metrics.ROW_COUNT.name: row_count_impl.create_accumulator(), + } + ) + + for df in dfs: + df_typed = cast(pd.DataFrame, df) + grouped = df_typed.groupby(dimension_col.name, dropna=False) + + for dimension_value, group_df in grouped: + dimension_value = self.format_dimension_value(dimension_value) + + dimension_aggregates[dimension_value][ + Metrics.NULL_COUNT.name + ] = null_count_impl.update_accumulator( + dimension_aggregates[dimension_value][Metrics.NULL_COUNT.name], + group_df, + ) + dimension_aggregates[dimension_value][ + Metrics.ROW_COUNT.name + ] = row_count_impl.update_accumulator( + dimension_aggregates[dimension_value][Metrics.ROW_COUNT.name], + group_df, + ) + + results_data = [] + for dimension_value, agg in dimension_aggregates.items(): + null_count = null_count_impl.aggregate_accumulator( + agg[Metrics.NULL_COUNT.name] + ) + row_count = row_count_impl.aggregate_accumulator( + agg[Metrics.ROW_COUNT.name] + ) + + results_data.append( + { + DIMENSION_VALUE_KEY: dimension_value, + Metrics.NULL_COUNT.name: null_count, + Metrics.ROW_COUNT.name: row_count, + DIMENSION_TOTAL_COUNT_KEY: row_count, + DIMENSION_FAILED_COUNT_KEY: null_count, + } + ) + + results_df = pd.DataFrame(results_data) + + if not results_df.empty: + results_df = calculate_impact_score_pandas( + results_df, + failed_column=DIMENSION_FAILED_COUNT_KEY, + total_column=DIMENSION_TOTAL_COUNT_KEY, + ) + + results_df = aggregate_others_pandas( + results_df, + dimension_column=DIMENSION_VALUE_KEY, + top_n=DEFAULT_TOP_DIMENSIONS, + ) + + for row_dict in results_df.to_dict("records"): + metric_values = self._build_metric_values_from_row( + row_dict, metrics_to_compute, test_params + ) + + evaluation = self._evaluate_test_condition( + metric_values, test_params + ) + + dimension_result = self._create_dimension_result( + row_dict, + dimension_col.name, + metric_values, + evaluation, + test_params, + ) + + dimension_results.append(dimension_result) + + except Exception as exc: + logger.warning(f"Error executing dimensional query: {exc}") + logger.debug("Full error details: ", exc_info=True) + + return dimension_results + def compute_row_count(self, column: SQALikeColumn): """Compute row count for the given column diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py index 7d77ccae455..fd485d7a4cd 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py @@ -13,16 +13,25 @@ Validator for column value missing count to be equal test case """ -from typing import Optional +from typing import List, Optional -from sqlalchemy import Column +from sqlalchemy import Column, case, func +from metadata.data_quality.validations.base_test_handler import ( + DIMENSION_FAILED_COUNT_KEY, + DIMENSION_OTHERS_LABEL, + DIMENSION_TOTAL_COUNT_KEY, +) from metadata.data_quality.validations.column.base.columnValuesMissingCount import ( BaseColumnValuesMissingCountValidator, ) +from metadata.data_quality.validations.impact_score import DEFAULT_TOP_DIMENSIONS from metadata.data_quality.validations.mixins.sqa_validator_mixin import ( + DIMENSION_GROUP_LABEL, SQAValidatorMixin, ) +from metadata.generated.schema.tests.dimensionResult import DimensionResult +from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.registry import Metrics from metadata.utils.logger import test_suite_logger @@ -42,3 +51,116 @@ class ColumnValuesMissingCountValidator( column: column """ return self.run_query_results(self.runner, metric, column, **kwargs) + + def _execute_dimensional_validation( + self, + column: Column, + dimension_col: Column, + metrics_to_compute: dict, + test_params: dict, + ) -> List[DimensionResult]: + """Execute dimensional validation for missing count with deviation recalculation + + Uses statistical aggregation to: + 1. Compute total_missing_count per dimension (NULL + optional custom values) + 2. Calculate deviation = abs(actual - expected) per dimension + 3. Aggregate "Others" and recalculate deviation from summed missing counts + + Args: + column: The column being validated + dimension_col: The dimension column to group by + metrics_to_compute: Dict mapping metric names to Metrics enums + test_params: Test parameters (MISSING_VALUE_MATCH, MISSING_COUNT_VALUE) + + Returns: + List[DimensionResult]: Top N dimensions plus "Others" + """ + dimension_results = [] + + try: + missing_values = test_params.get(self.MISSING_VALUE_MATCH) + expected_missing_count = test_params.get(self.MISSING_COUNT_VALUE, 0) + + total_missing_expr = Metrics.NULL_MISSING_COUNT(column).fn() + + if missing_values: + total_missing_expr = ( + total_missing_expr + + add_props(values=missing_values)(Metrics.COUNT_IN_SET.value)( + column + ).fn() + ) + + metric_expressions = { + self.TOTAL_MISSING_COUNT: total_missing_expr, + DIMENSION_TOTAL_COUNT_KEY: Metrics.ROW_COUNT().fn(), + } + + def build_failed_count(cte): + """Build failed count - deviation IS the failed count + + For initial CTE: deviation = abs(actual - expected) + """ + return func.abs( + getattr(cte.c, self.TOTAL_MISSING_COUNT) - expected_missing_count + ) + + def build_deviation_final(cte): + """Recalculate deviation for Others after aggregation + + For top dimensions: keep original deviation via max() + For Others: recalculate from aggregated total_missing_count + """ + return case( + [ + ( + getattr(cte.c, DIMENSION_GROUP_LABEL) + != DIMENSION_OTHERS_LABEL, + func.max(getattr(cte.c, DIMENSION_FAILED_COUNT_KEY)), + ) + ], + else_=( + func.abs( + func.sum(getattr(cte.c, self.TOTAL_MISSING_COUNT)) + - expected_missing_count + ) + ), + ) + + result_rows = self._execute_with_others_aggregation_statistical( + dimension_col, + metric_expressions, + build_failed_count, + final_metric_builders={ + DIMENSION_FAILED_COUNT_KEY: build_deviation_final + }, + top_dimensions_count=DEFAULT_TOP_DIMENSIONS, + ) + + for row in result_rows: + total_missing_count = row.get(self.TOTAL_MISSING_COUNT) + + if total_missing_count is None: + continue + + metric_values = { + self.TOTAL_MISSING_COUNT: total_missing_count, + } + + evaluation = self._evaluate_test_condition(metric_values, test_params) + + dimension_result = self._create_dimension_result( + row, + dimension_col.name, + metric_values, + evaluation, + test_params, + ) + + dimension_results.append(dimension_result) + + except Exception as exc: + logger.warning(f"Error executing dimensional query: {exc}") + logger.debug("Full error details: ", exc_info=True) + + return dimension_results diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py index f7f864f0a84..5e9939c337e 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py @@ -13,16 +13,22 @@ Validator for column values to be not null test case """ -from typing import Optional +from typing import List, Optional from sqlalchemy import Column +from metadata.data_quality.validations.base_test_handler import ( + DIMENSION_FAILED_COUNT_KEY, + DIMENSION_TOTAL_COUNT_KEY, +) from metadata.data_quality.validations.column.base.columnValuesToBeNotNull import ( BaseColumnValuesToBeNotNullValidator, ) +from metadata.data_quality.validations.impact_score import DEFAULT_TOP_DIMENSIONS from metadata.data_quality.validations.mixins.sqa_validator_mixin import ( SQAValidatorMixin, ) +from metadata.generated.schema.tests.dimensionResult import DimensionResult from metadata.profiler.metrics.registry import Metrics from metadata.utils.logger import test_suite_logger @@ -43,6 +49,68 @@ class ColumnValuesToBeNotNullValidator( """ return self.run_query_results(self.runner, metric, column) + def _execute_dimensional_validation( + self, + column: Column, + dimension_col: Column, + metrics_to_compute: dict, + test_params: dict, + ) -> List[DimensionResult]: + """Execute dimensional query with impact scoring and Others aggregation + + Calculates impact scores for all dimension values and aggregates + low-impact dimensions into "Others" category using CTEs. + + Args: + column: The column being validated + dimension_col: Single Column object corresponding to the dimension column + metrics_to_compute: Dictionary mapping Metrics enum names to Metrics objects + test_params: Dictionary with test-specific parameters (allowed_values, match_enum) + + Returns: + List[DimensionResult]: Top N dimensions by impact score plus "Others" + """ + dimension_results = [] + + try: + + # Build metric expressions using enum names as keys + metric_expressions = {} + for metric_name, metric in metrics_to_compute.items(): + metric_instance = metric.value(column) + metric_expressions[metric_name] = metric_instance.fn() + + metric_expressions[DIMENSION_TOTAL_COUNT_KEY] = Metrics.ROW_COUNT().fn() + metric_expressions[DIMENSION_FAILED_COUNT_KEY] = metric_expressions[ + Metrics.NULL_COUNT.name + ] + + result_rows = self._execute_with_others_aggregation( + dimension_col, metric_expressions, DEFAULT_TOP_DIMENSIONS + ) + + for row in result_rows: + # Build metric_values dict using helper method + metric_values = self._build_metric_values_from_row( + row, metrics_to_compute, test_params + ) + + # Evaluate test condition + evaluation = self._evaluate_test_condition(metric_values, test_params) + + # Create dimension result using helper method + dimension_result = self._create_dimension_result( + row, dimension_col.name, metric_values, evaluation, test_params + ) + + dimension_results.append(dimension_result) + + except Exception as exc: + logger.warning(f"Error executing dimensional query: {exc}") + logger.debug("Full error details: ", exc_info=True) + + return dimension_results + def compute_row_count(self, column: Column): """Compute row count for the given column diff --git a/ingestion/src/metadata/profiler/metrics/static/null_count.py b/ingestion/src/metadata/profiler/metrics/static/null_count.py index 0bcbf0b875a..8fc5bbbad7d 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_count.py @@ -14,13 +14,21 @@ Null Count Metric definition """ # pylint: disable=duplicate-code +from typing import TYPE_CHECKING from sqlalchemy import case, column from sqlalchemy.sql.functions import coalesce from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.sum import SumFn +from metadata.utils.logger import profiler_logger + +if TYPE_CHECKING: + import pandas as pd + +logger = profiler_logger() class NullCount(StaticMetric): @@ -56,4 +64,34 @@ class NullCount(StaticMetric): def df_fn(self, dfs=None): """pandas function""" - return sum(df[self.col.name].isnull().sum() for df in dfs) + computation = self.get_pandas_computation() + accumulator = computation.create_accumulator() + for df in dfs: + try: + accumulator = computation.update_accumulator(accumulator, df) + except Exception as err: + logger.debug( + f"Error while computing 'Null Count' for column '{self.col.name}': {err}" + ) + return None + return computation.aggregate_accumulator(accumulator) + + def get_pandas_computation(self) -> PandasComputation: + """Returns the logic to compute this metrics using Pandas""" + return PandasComputation[int, int]( + create_accumulator=lambda: 0, + update_accumulator=lambda acc, df: NullCount.update_accumulator( + acc, df, self.col + ), + aggregate_accumulator=lambda acc: acc, + ) + + @staticmethod + def update_accumulator(current_count: int, df: "pd.DataFrame", column) -> int: + """Computes one DataFrame chunk and updates the running null count + + Maintains a single count value. Adds chunk's null count to the + current total and returns the sum. + """ + chunk_null_count = df[column.name].isnull().sum() + return current_count + chunk_null_count diff --git a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py index b62a62c2322..63dfe67f02f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_missing_count.py @@ -14,12 +14,20 @@ Null Count Metric definition """ # pylint: disable=duplicate-code +from typing import TYPE_CHECKING from sqlalchemy import case, column from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.metrics.pandas_metric_protocol import PandasComputation from metadata.profiler.orm.functions.sum import SumFn +from metadata.utils.logger import profiler_logger + +if TYPE_CHECKING: + import pandas as pd + +logger = profiler_logger() class NullMissingCount(StaticMetric): @@ -67,7 +75,36 @@ class NullMissingCount(StaticMetric): ) def df_fn(self, dfs=None): + """pandas function""" + computation = self.get_pandas_computation() + accumulator = computation.create_accumulator() + for df in dfs: + try: + accumulator = computation.update_accumulator(accumulator, df) + except Exception as err: + logger.debug( + f"Error while computing 'Null Missing Count' for column '{self.col.name}': {err}" + ) + return None + return computation.aggregate_accumulator(accumulator) + + def get_pandas_computation(self) -> PandasComputation: + """Returns the logic to compute this metrics using Pandas""" + return PandasComputation[int, int]( + create_accumulator=lambda: 0, + update_accumulator=lambda acc, df: NullMissingCount.update_accumulator( + acc, df, self.col + ), + aggregate_accumulator=lambda acc: acc, + ) + + @staticmethod + def update_accumulator(current_count: int, df: "pd.DataFrame", column) -> int: + """Computes one DataFrame chunk and updates the running null/missing count + + Maintains a single count value. Adds chunk's null and empty string count + to the current total and returns the sum. """ - Returns the pandas function for calculating the metric. - """ - return sum(df[self.col.name].isnull().sum() for df in dfs) + chunk_null_count = df[column.name].isnull().sum() + chunk_empty_count = (df[column.name] == "").sum() + return current_count + chunk_null_count + chunk_empty_count diff --git a/ingestion/tests/unit/data_quality/validations/test_base_handler.py b/ingestion/tests/unit/data_quality/validations/test_base_handler.py index 477aee7e94a..a128940731c 100644 --- a/ingestion/tests/unit/data_quality/validations/test_base_handler.py +++ b/ingestion/tests/unit/data_quality/validations/test_base_handler.py @@ -386,7 +386,7 @@ class TestBaseTestValidator: validator._run_dimensional_validation.assert_called_once() -def test_get_test_parameters_default_returns_none(): +def test_get_test_parameters_default_returns_empty_dict(): """Test that default _get_test_parameters implementation returns None""" test_case = MagicMock(spec=TestCase) test_case.name = "test_default_params" @@ -400,7 +400,7 @@ def test_get_test_parameters_default_returns_none(): result = validator._get_test_parameters() - assert result is None + assert result == {} def test_evaluate_test_condition_not_implemented_error(): diff --git a/ingestion/tests/unit/test_suite/conftest.py b/ingestion/tests/unit/test_suite/conftest.py index b11bd57f493..8efefa57cc9 100644 --- a/ingestion/tests/unit/test_suite/conftest.py +++ b/ingestion/tests/unit/test_suite/conftest.py @@ -1125,3 +1125,49 @@ def test_case_column_values_to_not_match_regex_dimensional(): dimensionColumns=["age"], computePassedFailedRowCount=True, ) # type: ignore + + +@pytest.fixture +def test_case_column_values_to_be_not_null_dimensional(): + """Test case for test column_values_to_be_not_null with dimensional analysis""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_AGE, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + dimensionColumns=["name"], + computePassedFailedRowCount=True, + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_missing_count_to_be_equal_dimensional(): + """Test case for test column_values_missing_count with dimensional analysis""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="missingCountValue", value="2000"), + ], + dimensionColumns=["name"], + computePassedFailedRowCount=True, + ) # type: ignore + + +@pytest.fixture +def test_case_column_values_missing_count_to_be_equal_missing_values_dimensional(): + """Test case for test column_values_missing_count with custom missing values and dimensional analysis""" + return TestCase( + name=TEST_CASE_NAME, + entityLink=ENTITY_LINK_NICKNAME, + testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore + testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore + parameterValues=[ + TestCaseParameterValue(name="missingCountValue", value="2000"), + TestCaseParameterValue(name="missingValueMatch", value="['Johnny d']"), + ], + dimensionColumns=["name"], + computePassedFailedRowCount=True, + ) # type: ignore diff --git a/ingestion/tests/unit/test_suite/test_validations_databases.py b/ingestion/tests/unit/test_suite/test_validations_databases.py index dab5fac3d63..6f6bb38f21c 100644 --- a/ingestion/tests/unit/test_suite/test_validations_databases.py +++ b/ingestion/tests/unit/test_suite/test_validations_databases.py @@ -864,6 +864,75 @@ TEST_CASE_SUPPORT_ROW_LEVEL_PASS_FAILED = { ("age=31", TestCaseStatus.Success, 30, 0, 100, 0, 0), ], ), + ( + "test_case_column_values_to_be_not_null_dimensional", + "columnValuesToBeNotNull", + "COLUMN", + ( + TestCaseResult, + "20", + None, + TestCaseStatus.Failed, + 60.0, + 20.0, + 75.0, + 25.0, + ), + [ + ("name=Eve", TestCaseStatus.Failed, 0, 10, 0, 100, 0.0333), + ("name=John", TestCaseStatus.Failed, 10, 10, 50, 50, 0.0167), + ("name=Alice", TestCaseStatus.Success, 10, 0, 100, 0, 0), + ("name=Bob", TestCaseStatus.Success, 10, 0, 100, 0, 0), + ("name=Charlie", TestCaseStatus.Success, 10, 0, 100, 0, 0), + ("name=Others", TestCaseStatus.Success, 20, 0, 100, 0, 0), + ], + ), + ( + "test_case_column_values_missing_count_to_be_equal_dimensional", + "columnValuesMissingCount", + "COLUMN", + ( + TestCaseResult, + "20", + None, + TestCaseStatus.Failed, + None, + None, + None, + None, + ), + [ + ("name=Alice", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Bob", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Diana", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Eve", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Others", TestCaseStatus.Failed, None, None, None, None, 1), + ], + ), + ( + "test_case_column_values_missing_count_to_be_equal_missing_values_dimensional", + "columnValuesMissingCount", + "COLUMN", + ( + TestCaseResult, + "30", + None, + TestCaseStatus.Failed, + None, + None, + None, + None, + ), + [ + ("name=Alice", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Bob", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Diana", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Eve", TestCaseStatus.Failed, None, None, None, None, 1), + ("name=Others", TestCaseStatus.Failed, None, None, None, None, 1), + ], + ), ], ) def test_suite_validation_database( diff --git a/ingestion/tests/unit/test_suite/test_validations_datalake.py b/ingestion/tests/unit/test_suite/test_validations_datalake.py index 2ec7a6ea687..2c4491dd871 100644 --- a/ingestion/tests/unit/test_suite/test_validations_datalake.py +++ b/ingestion/tests/unit/test_suite/test_validations_datalake.py @@ -1062,6 +1062,75 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame( ("age=31.0", TestCaseStatus.Success, 6000, 0, 100, 0, 0), ], ), + ( + "test_case_column_values_to_be_not_null_dimensional", + "columnValuesToBeNotNull", + "COLUMN", + ( + TestCaseResult, + "4000", + None, + TestCaseStatus.Failed, + 12000.0, + 4000.0, + 75.0, + 25.0, + ), + [ + ("name=Eve", TestCaseStatus.Failed, 0, 2000, 0, 100, 0.6667), + ("name=John", TestCaseStatus.Failed, 2000, 2000, 50, 50, 0.1667), + ("name=Alice", TestCaseStatus.Success, 2000, 0, 100, 0, 0), + ("name=Bob", TestCaseStatus.Success, 2000, 0, 100, 0, 0), + ("name=Charlie", TestCaseStatus.Success, 2000, 0, 100, 0, 0), + ("name=Others", TestCaseStatus.Success, 4000, 0, 100, 0, 0), + ], + ), + ( + "test_case_column_values_missing_count_to_be_equal_dimensional", + "columnValuesMissingCount", + "COLUMN", + ( + TestCaseResult, + "2000", + None, + TestCaseStatus.Success, + None, + None, + None, + None, + ), + [ + ("name=Alice", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Bob", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Diana", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Eve", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Others", TestCaseStatus.Success, None, None, None, None, 0), + ], + ), + ( + "test_case_column_values_missing_count_to_be_equal_missing_values_dimensional", + "columnValuesMissingCount", + "COLUMN", + ( + TestCaseResult, + "4000", + None, + TestCaseStatus.Failed, + None, + None, + None, + None, + ), + [ + ("name=Alice", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Bob", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Charlie", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Diana", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Eve", TestCaseStatus.Failed, None, None, None, None, 0.6667), + ("name=Others", TestCaseStatus.Failed, None, None, None, None, 0.0741), + ], + ), ], ) def test_suite_validation_datalake(