diff --git a/ingestion/src/metadata/data_quality/validations/base_test_handler.py b/ingestion/src/metadata/data_quality/validations/base_test_handler.py index 984bbd40623..1999baf10de 100644 --- a/ingestion/src/metadata/data_quality/validations/base_test_handler.py +++ b/ingestion/src/metadata/data_quality/validations/base_test_handler.py @@ -343,20 +343,17 @@ class BaseTestValidator(ABC): """ from metadata.generated.schema.tests.basic import DimensionValue - # Auto-calculate failed rows if not provided if failed_rows is None: failed_rows = total_rows - passed_rows - # Calculate percentages - derive one from the other to ensure they sum to 100% + # Derive one percentage from the other to ensure they sum to 100% if total_rows > 0: passed_rows_percentage = round(passed_rows / total_rows * 100, 2) - # Derive failed percentage to ensure sum equals 100% failed_rows_percentage = round(100 - passed_rows_percentage, 2) else: passed_rows_percentage = 0 failed_rows_percentage = 0 - # Convert dictionary to array of DimensionValue objects dimension_values_array = [ DimensionValue(name=name, value=value) for name, value in dimension_values.items() @@ -371,9 +368,7 @@ class BaseTestValidator(ABC): failedRows=failed_rows, passedRowsPercentage=passed_rows_percentage, failedRowsPercentage=failed_rows_percentage, - impactScore=round(impact_score, 4) - if impact_score is not None - else None, # Round to 4 decimal places + impactScore=round(impact_score, 4) if impact_score is not None else None, ) return dimension_result diff --git a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeInSet.py b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeInSet.py index 48adb6e85fb..19cd3afb577 100644 --- a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeInSet.py +++ b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeInSet.py @@ -107,15 +107,12 @@ class BaseColumnValuesToBeInSetValidator(BaseTestValidator): List[DimensionResult]: List of dimension-specific test results """ try: - # Get dimension columns from test case dimension_columns = self.test_case.dimensionColumns or [] if not dimension_columns: return [] - # Get the column to validate (same as _run_validation) column: Union[SQALikeColumn, Column] = self._get_column_name() - # Get test parameters (same as _run_validation) allowed_values = self.get_test_case_param_value( self.test_case.parameterValues, # type: ignore "allowedValues", @@ -126,34 +123,27 @@ class BaseColumnValuesToBeInSetValidator(BaseTestValidator): self.test_case.parameterValues, "matchEnum" ) - # Define the metrics to compute (same as _run_validation) metrics_to_compute = { "count_in_set": Metrics.COUNT_IN_SET, } - # Add row count metric if match_enum is enabled if match_enum: metrics_to_compute["row_count"] = Metrics.ROW_COUNT - # Store test parameters for child class test_params = { "allowed_values": allowed_values, "match_enum": match_enum, } - # Execute separate queries for each dimension column dimension_results = [] for dimension_column in dimension_columns: try: - # Get dimension column object dimension_col = self._get_column_name(dimension_column) - # Execute dimensional query for this single dimension single_dimension_results = self._execute_dimensional_query( column, dimension_col, metrics_to_compute, test_params ) - # Add to overall results list (now directly a list) dimension_results.extend(single_dimension_results) except Exception as exc: diff --git a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeUnique.py index 53d7d55e64f..1aa38aa6107 100644 --- a/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/base/columnValuesToBeUnique.py @@ -92,34 +92,26 @@ class BaseColumnValuesToBeUniqueValidator(BaseTestValidator): List[DimensionResult]: List of dimension-specific test results """ try: - # Get dimension columns from test case dimension_columns = self.test_case.dimensionColumns or [] if not dimension_columns: return [] - # Get the column to validate (same as _run_validation) column: Union[SQALikeColumn, Column] = self._get_column_name() - # Define the metrics to compute (same as _run_validation) metrics_to_compute = { "count": Metrics.COUNT, "unique_count": Metrics.UNIQUE_COUNT, } - # Execute separate queries for each dimension column dimension_results = [] for dimension_column in dimension_columns: try: - # Get dimension column object dimension_col = self._get_column_name(dimension_column) - # Execute dimensional query for this single dimension - # This will return results grouped by this dimension only single_dimension_results = self._execute_dimensional_query( column, dimension_col, metrics_to_compute ) - # Add to overall results list (now directly a list) dimension_results.extend(single_dimension_results) except Exception as exc: @@ -132,7 +124,6 @@ class BaseColumnValuesToBeUniqueValidator(BaseTestValidator): except Exception as exc: logger.warning(f"Error executing dimensional validation: {exc}") - # Return empty list on error (test continues without dimensions) return [] @abstractmethod diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeInSet.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeInSet.py index 8b2d0d6d84d..9899020f856 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeInSet.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeInSet.py @@ -60,13 +60,11 @@ class ColumnValuesToBeInSetValidator( SQALikeColumn: Column object """ if column_name is None: - # Get the main column being validated (original behavior) return self.get_column_name( self.test_case.entityLink.root, self.runner, ) else: - # Get a specific column by name (for dimension columns) return self.get_column_name( column_name, self.runner, @@ -111,28 +109,21 @@ class ColumnValuesToBeInSetValidator( dimension_results = [] try: - # Extract test parameters allowed_values = test_params["allowed_values"] match_enum = test_params["match_enum"] - # Get the dataframe dfs = self.runner if isinstance(self.runner, list) else [self.runner] df = dfs[0] - # Group by dimension column grouped = df.groupby(dimension_col.name, dropna=False) - - # Prepare results dataframe results_data = [] for dimension_value, group_df in grouped: - # Handle NULL values if pd.isna(dimension_value): dimension_value = DIMENSION_NULL_LABEL else: dimension_value = str(dimension_value) - # Calculate metrics for this group count_in_set = group_df[column.name].isin(allowed_values).sum() if match_enum: @@ -159,32 +150,25 @@ class ColumnValuesToBeInSetValidator( } ) - # Create DataFrame with results results_df = pd.DataFrame(results_data) if not results_df.empty: - # Calculate impact scores results_df = calculate_impact_score_pandas( results_df, failed_column=DIMENSION_FAILED_COUNT_KEY, total_column=DIMENSION_TOTAL_COUNT_KEY, ) - # Aggregate Others results_df = aggregate_others_pandas( results_df, dimension_column="dimension", top_n=DEFAULT_TOP_DIMENSIONS, ) - # Process results into DimensionResult objects for _, row in results_df.iterrows(): dimension_value = row["dimension"] - - # Extract metric values count_in_set = int(row.get("count_in_set", 0)) - # Follow SQLAlchemy's exact logic if match_enum: # Enum mode: track actual totals and failures total_count = int(row.get(DIMENSION_TOTAL_COUNT_KEY, 0)) @@ -198,7 +182,6 @@ class ColumnValuesToBeInSetValidator( impact_score = float(row.get("impact_score", 0.0)) - # Create dimension result dimension_result = self.get_dimension_result_object( dimension_values={dimension_col.name: dimension_value}, test_case_status=self.get_test_case_status(matched), diff --git a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py index 24cc6db4767..16dfdc9d9a9 100644 --- a/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesToBeUnique.py @@ -60,13 +60,11 @@ class ColumnValuesToBeUniqueValidator( SQALikeColumn: column """ if column_name is None: - # Get the main column being validated (original behavior) return self.get_column_name( self.test_case.entityLink.root, self.runner, ) else: - # Get a specific column by name (for dimension columns) return self.get_column_name( column_name, self.runner, @@ -106,24 +104,18 @@ class ColumnValuesToBeUniqueValidator( dimension_results = [] try: - # Get the dataframe dfs = self.runner if isinstance(self.runner, list) else [self.runner] df = dfs[0] - # Group by dimension column grouped = df.groupby(dimension_col.name, dropna=False) - - # Prepare results dataframe results_data = [] for dimension_value, group_df in grouped: - # Handle NULL values if pd.isna(dimension_value): dimension_value = DIMENSION_NULL_LABEL else: dimension_value = str(dimension_value) - # Calculate metrics for this group total_count = len(group_df) unique_count = group_df[column.name].nunique() duplicate_count = total_count - unique_count @@ -138,29 +130,23 @@ class ColumnValuesToBeUniqueValidator( } ) - # Create DataFrame with results results_df = pd.DataFrame(results_data) if not results_df.empty: - # Calculate impact scores results_df = calculate_impact_score_pandas( results_df, failed_column=DIMENSION_FAILED_COUNT_KEY, total_column=DIMENSION_TOTAL_COUNT_KEY, ) - # Aggregate Others results_df = aggregate_others_pandas( results_df, dimension_column="dimension", top_n=DEFAULT_TOP_DIMENSIONS, ) - # Process results into DimensionResult objects for _, row in results_df.iterrows(): dimension_value = row["dimension"] - - # Extract metric values total_count = int(row.get("count", 0)) unique_count = int(row.get("unique_count", 0)) duplicate_count = int(row.get(DIMENSION_FAILED_COUNT_KEY, 0)) diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py index b0a7e7435b2..6aa4dd8bd9a 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py @@ -109,11 +109,9 @@ class ColumnValuesToBeInSetValidator( dimension_results = [] try: - # Extract test parameters allowed_values = test_params["allowed_values"] match_enum = test_params["match_enum"] - # Build metric expressions dictionary metric_expressions = {} for metric_name, metric in metrics_to_compute.items(): metric_instance = metric.value(column) @@ -121,7 +119,6 @@ class ColumnValuesToBeInSetValidator( metric_instance.values = allowed_values metric_expressions[metric_name] = metric_instance.fn() - # Add standardized keys for impact scoring from metadata.data_quality.validations.base_test_handler import ( DIMENSION_FAILED_COUNT_KEY, DIMENSION_TOTAL_COUNT_KEY, @@ -142,14 +139,11 @@ class ColumnValuesToBeInSetValidator( ] metric_expressions[DIMENSION_FAILED_COUNT_KEY] = func.literal(0) - # Execute with Others aggregation (always use CTEs for impact scoring) result_rows = self._execute_with_others_aggregation( dimension_col, metric_expressions, DEFAULT_TOP_DIMENSIONS ) - # Process results into DimensionResult objects for row in result_rows: - # Extract values using dictionary keys from metadata.data_quality.validations.base_test_handler import ( DIMENSION_NULL_LABEL, ) @@ -160,7 +154,6 @@ class ColumnValuesToBeInSetValidator( else DIMENSION_NULL_LABEL ) - # Extract metric results - preserve original logic count_in_set = row.get("count_in_set", 0) or 0 # PRESERVE ORIGINAL LOGIC: match_enum determines how we get total_count @@ -178,7 +171,6 @@ class ColumnValuesToBeInSetValidator( impact_score = row.get("impact_score", 0.0) - # Create dimension result using the helper method dimension_result = self.get_dimension_result_object( dimension_values={dimension_col.name: dimension_value}, test_case_status=self.get_test_case_status(matched), @@ -191,18 +183,13 @@ class ColumnValuesToBeInSetValidator( total_rows=total_count, passed_rows=count_in_set, failed_rows=failed_count if match_enum else None, - impact_score=impact_score - if match_enum - else None, # Only include impact score when we have full metrics + impact_score=impact_score if match_enum else None, ) - # Add to results list dimension_results.append(dimension_result) except Exception as exc: - # Use the same error handling pattern as _run_results logger.warning(f"Error executing dimensional query: {exc}") logger.debug("Full error details: ", exc_info=True) - # Return empty list on error (test continues without dimensions) return dimension_results diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py index 708a587d32c..e0e0016bf9a 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py @@ -151,26 +151,21 @@ class ColumnValuesToBeUniqueValidator( ) continue - # Add standardized keys for impact scoring from metadata.data_quality.validations.base_test_handler import ( DIMENSION_FAILED_COUNT_KEY, DIMENSION_TOTAL_COUNT_KEY, ) - # For uniqueness test: failed = total - unique (duplicates) metric_expressions[DIMENSION_TOTAL_COUNT_KEY] = metric_expressions["count"] metric_expressions[DIMENSION_FAILED_COUNT_KEY] = ( metric_expressions["count"] - metric_expressions["unique_count"] ) - # Execute with Others aggregation (always use CTEs for impact scoring) result_rows = self._execute_with_others_aggregation( dimension_col, metric_expressions, DEFAULT_TOP_DIMENSIONS ) - # Process results into DimensionResult objects for row in result_rows: - # Extract values using dictionary keys from metadata.data_quality.validations.base_test_handler import ( DIMENSION_NULL_LABEL, ) @@ -181,17 +176,13 @@ class ColumnValuesToBeUniqueValidator( else DIMENSION_NULL_LABEL ) - # Extract metric results total_count = row.get("count", 0) or 0 unique_count = row.get("unique_count", 0) or 0 - - # Calculate duplicate count (failed rows for uniqueness test) duplicate_count = total_count - unique_count matched = total_count == unique_count impact_score = row.get("impact_score", 0.0) - # Create dimension result using the helper method dimension_result = self.get_dimension_result_object( dimension_values={dimension_col.name: dimension_value}, test_case_status=self.get_test_case_status(matched),