diff --git a/ingestion/src/metadata/great_expectations/action.py b/ingestion/src/metadata/great_expectations/action.py index 56ebdd76123..3f737085e2e 100644 --- a/ingestion/src/metadata/great_expectations/action.py +++ b/ingestion/src/metadata/great_expectations/action.py @@ -21,6 +21,7 @@ from datetime import datetime from typing import Dict, List, Optional, Union, cast from great_expectations.checkpoint.actions import ValidationAction +from great_expectations.core import ExpectationConfiguration from great_expectations.core.batch import Batch from great_expectations.core.batch_spec import ( RuntimeDataBatchSpec, @@ -111,6 +112,7 @@ class OpenMetadataValidationAction(ValidationAction): self.schema_name = schema_name # for database without schema concept self.config_file_path = config_file_path self.ometa_conn = self._create_ometa_connection() + self.expectation_suite = None def _run( # pylint: disable=unused-argument self, @@ -133,6 +135,12 @@ class OpenMetadataValidationAction(ValidationAction): checkpoint_identifier: identifier for the checkpoint """ + if expectation_suite_identifier: + expectation_suite_name = expectation_suite_identifier.expectation_suite_name + self.expectation_suite = self.data_context.get_expectation_suite( + expectation_suite_name + ) + check_point_spec = self._get_checkpoint_batch_spec(data_asset) table_entity = None if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec): @@ -316,6 +324,18 @@ class OpenMetadataValidationAction(ValidationAction): def _get_test_case_params_value(self, result: dict) -> List[TestCaseParameterValue]: """Build test case parameter value from GE test result""" + if self.expectation_suite: + expectation = self._get_expectation_config(result) + if expectation: + return [ + TestCaseParameterValue( + name=key, + value=str(value), + ) # type: ignore + for key, value in expectation.kwargs.items() # type: ignore + if key not in {"column", "batch_id"} + ] + if "observed_value" not in result["result"]: return [ TestCaseParameterValue( @@ -337,6 +357,17 @@ class OpenMetadataValidationAction(ValidationAction): self, result: dict ) -> List[TestCaseParameterDefinition]: """Build test case parameter definition from GE test result""" + if self.expectation_suite: + expectation = self._get_expectation_config(result) + if expectation: + return [ + TestCaseParameterDefinition( + name=key, + ) # type: ignore + for key, _ in expectation.kwargs.items() # type: ignore + if key not in {"column", "batch_id"} + ] + if "observed_value" not in result["result"]: return [ TestCaseParameterDefinition( @@ -361,19 +392,118 @@ class OpenMetadataValidationAction(ValidationAction): Returns: TestCaseResult: a test case result object """ - try: - test_result_value = TestResultValue( - name="observed_value", - value=str(result["result"]["observed_value"]), - ) - except KeyError: - unexpected_percent_total = result["result"].get("unexpected_percent_total") - test_result_value = TestResultValue( - name="unexpected_percentage_total", - value=str(unexpected_percent_total), + test_result_values = [] + result_data = result.get("result", {}) + + numeric_fields = [ + "unexpected_percent", + "unexpected_percent_total", + "missing_percent", + "unexpected_count", + "missing_count", + "element_count", + "observed_value", + "success_rate", + ] + + for field in numeric_fields: + if field in result_data: + value = result_data[field] + + if isinstance(value, (int, float)): + test_result_values.append( + TestResultValue( + name=field, + value=str(value), + predictedValue=None, + ) + ) + elif field == "observed_value": + test_result_values.extend( + self._extract_complex_value_from_observed_value(value) + ) + + return test_result_values + + def _extract_complex_value_from_observed_value( + self, observed_value + ) -> List[TestResultValue]: + """Extract complex value from observed value + + Args: + observed_value: observed value + Returns: + str: complex value + """ + if isinstance(observed_value, list): + return [ + TestResultValue( + name="element_count", + value=str(len(observed_value)), + predictedValue=None, + ) + ] + + if isinstance(observed_value, dict): + if "quantiles" in observed_value: + result_values = [] + quantiles = observed_value["quantiles"] + values = observed_value["values"] + for quantile, value in zip(quantiles, values): + result_values.append( + TestResultValue( + name=f"quantile_{str(quantile)}", + value=str(value), + predictedValue=None, + ) + ) + return result_values + + # catch all other cases that are not a quantile + for k, v in observed_value.items(): + if isinstance(v, (int, float)): + return [ + TestResultValue( + name=k, + value=str(v), + predictedValue=None, + ) + ] + + if isinstance(observed_value, str): + return [ + TestResultValue( + name="observed_value", + value=str(1), + predictedValue=None, + ) + ] + + return [] + + def _get_expectation_config( + self, result: dict + ) -> Optional[ExpectationConfiguration]: + """Get expectation config from GE test result + + Args: + result: GE test result + Returns: + Optional[ExpectationConfiguration]: expectation configuration + """ + if self.expectation_suite: + name = result["expectation_config"]["expectation_type"] + expectation = next( + ( + expectation + for expectation in self.expectation_suite.expectations + if expectation.expectation_type == name + ), + None, ) - return [test_result_value] + return expectation + return None def _handle_test_case(self, result: Dict, table_entity: Table): """Handle adding test to table entity based on the test case. diff --git a/ingestion/src/metadata/great_expectations/action1xx.py b/ingestion/src/metadata/great_expectations/action1xx.py index d1fe89dca41..0d497828d61 100644 --- a/ingestion/src/metadata/great_expectations/action1xx.py +++ b/ingestion/src/metadata/great_expectations/action1xx.py @@ -319,19 +319,94 @@ class OpenMetadataValidationAction1xx(ValidationAction): Returns: TestCaseResult: a test case result object """ - try: - test_result_value = TestResultValue( - name="observed_value", - value=str(result["result"]["observed_value"]), - ) - except KeyError: - unexpected_percent_total = result["result"].get("unexpected_percent_total") - test_result_value = TestResultValue( - name="unexpected_percentage_total", - value=str(unexpected_percent_total), - ) + test_result_values = [] + result_data = result.get("result", {}) - return [test_result_value] + numeric_fields = [ + "unexpected_percent", + "unexpected_percent_total", + "missing_percent", + "unexpected_count", + "missing_count", + "element_count", + "observed_value", + "success_rate", + ] + + for field in numeric_fields: + if field in result_data: + value = result_data[field] + + if isinstance(value, (int, float)): + test_result_values.append( + TestResultValue( + name=field, + value=str(value), + predictedValue=None, + ) + ) + elif field == "observed_value": + test_result_values.extend( + self._extract_complex_value_from_observed_value(value) + ) + + return test_result_values + + def _extract_complex_value_from_observed_value( + self, observed_value + ) -> List[TestResultValue]: + """Extract complex value from observed value + + Args: + observed_value: observed value + Returns: + str: complex value + """ + if isinstance(observed_value, list): + return [ + TestResultValue( + name="element_count", + value=str(len(observed_value)), + predictedValue=None, + ) + ] + + if isinstance(observed_value, dict): + if "quantiles" in observed_value: + result_values = [] + quantiles = observed_value["quantiles"] + values = observed_value["values"] + for quantile, value in zip(quantiles, values): + result_values.append( + TestResultValue( + name=f"quantile_{str(quantile)}", + value=str(value), + predictedValue=None, + ) + ) + return result_values + + # catch all other cases that are not a quantile + for k, v in observed_value.items(): + if isinstance(v, (int, float)): + return [ + TestResultValue( + name=k, + value=str(v), + predictedValue=None, + ) + ] + + if isinstance(observed_value, str): + return [ + TestResultValue( + name="observed_value", + value=str(1), + predictedValue=None, + ) + ] + + return [] def _handle_test_case(self, result: Dict, table_entity: Table): """Handle adding test to table entity based on the test case.