feat: improve test result details for GX integration (#22424)

This commit is contained in:
Teddy 2025-07-17 15:41:29 +02:00 committed by GitHub
parent 53356f213d
commit 4a264e7a3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 228 additions and 23 deletions

View File

@ -21,6 +21,7 @@ from datetime import datetime
from typing import Dict, List, Optional, Union, cast
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core import ExpectationConfiguration
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
@ -111,6 +112,7 @@ class OpenMetadataValidationAction(ValidationAction):
self.schema_name = schema_name # for database without schema concept
self.config_file_path = config_file_path
self.ometa_conn = self._create_ometa_connection()
self.expectation_suite = None
def _run( # pylint: disable=unused-argument
self,
@ -133,6 +135,12 @@ class OpenMetadataValidationAction(ValidationAction):
checkpoint_identifier: identifier for the checkpoint
"""
if expectation_suite_identifier:
expectation_suite_name = expectation_suite_identifier.expectation_suite_name
self.expectation_suite = self.data_context.get_expectation_suite(
expectation_suite_name
)
check_point_spec = self._get_checkpoint_batch_spec(data_asset)
table_entity = None
if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec):
@ -316,6 +324,18 @@ class OpenMetadataValidationAction(ValidationAction):
def _get_test_case_params_value(self, result: dict) -> List[TestCaseParameterValue]:
"""Build test case parameter value from GE test result"""
if self.expectation_suite:
expectation = self._get_expectation_config(result)
if expectation:
return [
TestCaseParameterValue(
name=key,
value=str(value),
) # type: ignore
for key, value in expectation.kwargs.items() # type: ignore
if key not in {"column", "batch_id"}
]
if "observed_value" not in result["result"]:
return [
TestCaseParameterValue(
@ -337,6 +357,17 @@ class OpenMetadataValidationAction(ValidationAction):
self, result: dict
) -> List[TestCaseParameterDefinition]:
"""Build test case parameter definition from GE test result"""
if self.expectation_suite:
expectation = self._get_expectation_config(result)
if expectation:
return [
TestCaseParameterDefinition(
name=key,
) # type: ignore
for key, _ in expectation.kwargs.items() # type: ignore
if key not in {"column", "batch_id"}
]
if "observed_value" not in result["result"]:
return [
TestCaseParameterDefinition(
@ -361,19 +392,118 @@ class OpenMetadataValidationAction(ValidationAction):
Returns:
TestCaseResult: a test case result object
"""
try:
test_result_value = TestResultValue(
name="observed_value",
value=str(result["result"]["observed_value"]),
)
except KeyError:
unexpected_percent_total = result["result"].get("unexpected_percent_total")
test_result_value = TestResultValue(
name="unexpected_percentage_total",
value=str(unexpected_percent_total),
test_result_values = []
result_data = result.get("result", {})
numeric_fields = [
"unexpected_percent",
"unexpected_percent_total",
"missing_percent",
"unexpected_count",
"missing_count",
"element_count",
"observed_value",
"success_rate",
]
for field in numeric_fields:
if field in result_data:
value = result_data[field]
if isinstance(value, (int, float)):
test_result_values.append(
TestResultValue(
name=field,
value=str(value),
predictedValue=None,
)
)
elif field == "observed_value":
test_result_values.extend(
self._extract_complex_value_from_observed_value(value)
)
return test_result_values
def _extract_complex_value_from_observed_value(
self, observed_value
) -> List[TestResultValue]:
"""Extract complex value from observed value
Args:
observed_value: observed value
Returns:
str: complex value
"""
if isinstance(observed_value, list):
return [
TestResultValue(
name="element_count",
value=str(len(observed_value)),
predictedValue=None,
)
]
if isinstance(observed_value, dict):
if "quantiles" in observed_value:
result_values = []
quantiles = observed_value["quantiles"]
values = observed_value["values"]
for quantile, value in zip(quantiles, values):
result_values.append(
TestResultValue(
name=f"quantile_{str(quantile)}",
value=str(value),
predictedValue=None,
)
)
return result_values
# catch all other cases that are not a quantile
for k, v in observed_value.items():
if isinstance(v, (int, float)):
return [
TestResultValue(
name=k,
value=str(v),
predictedValue=None,
)
]
if isinstance(observed_value, str):
return [
TestResultValue(
name="observed_value",
value=str(1),
predictedValue=None,
)
]
return []
def _get_expectation_config(
self, result: dict
) -> Optional[ExpectationConfiguration]:
"""Get expectation config from GE test result
Args:
result: GE test result
Returns:
Optional[ExpectationConfiguration]: expectation configuration
"""
if self.expectation_suite:
name = result["expectation_config"]["expectation_type"]
expectation = next(
(
expectation
for expectation in self.expectation_suite.expectations
if expectation.expectation_type == name
),
None,
)
return [test_result_value]
return expectation
return None
def _handle_test_case(self, result: Dict, table_entity: Table):
"""Handle adding test to table entity based on the test case.

View File

@ -319,19 +319,94 @@ class OpenMetadataValidationAction1xx(ValidationAction):
Returns:
TestCaseResult: a test case result object
"""
try:
test_result_value = TestResultValue(
name="observed_value",
value=str(result["result"]["observed_value"]),
)
except KeyError:
unexpected_percent_total = result["result"].get("unexpected_percent_total")
test_result_value = TestResultValue(
name="unexpected_percentage_total",
value=str(unexpected_percent_total),
)
test_result_values = []
result_data = result.get("result", {})
return [test_result_value]
numeric_fields = [
"unexpected_percent",
"unexpected_percent_total",
"missing_percent",
"unexpected_count",
"missing_count",
"element_count",
"observed_value",
"success_rate",
]
for field in numeric_fields:
if field in result_data:
value = result_data[field]
if isinstance(value, (int, float)):
test_result_values.append(
TestResultValue(
name=field,
value=str(value),
predictedValue=None,
)
)
elif field == "observed_value":
test_result_values.extend(
self._extract_complex_value_from_observed_value(value)
)
return test_result_values
def _extract_complex_value_from_observed_value(
self, observed_value
) -> List[TestResultValue]:
"""Extract complex value from observed value
Args:
observed_value: observed value
Returns:
str: complex value
"""
if isinstance(observed_value, list):
return [
TestResultValue(
name="element_count",
value=str(len(observed_value)),
predictedValue=None,
)
]
if isinstance(observed_value, dict):
if "quantiles" in observed_value:
result_values = []
quantiles = observed_value["quantiles"]
values = observed_value["values"]
for quantile, value in zip(quantiles, values):
result_values.append(
TestResultValue(
name=f"quantile_{str(quantile)}",
value=str(value),
predictedValue=None,
)
)
return result_values
# catch all other cases that are not a quantile
for k, v in observed_value.items():
if isinstance(v, (int, float)):
return [
TestResultValue(
name=k,
value=str(v),
predictedValue=None,
)
]
if isinstance(observed_value, str):
return [
TestResultValue(
name="observed_value",
value=str(1),
predictedValue=None,
)
]
return []
def _handle_test_case(self, result: Dict, table_entity: Table):
"""Handle adding test to table entity based on the test case.