Merge remote-tracking branch 'origin/main'

This commit is contained in:
mohitdeuex 2023-09-22 01:31:19 +05:30
commit b8d04e93ed

View File

@ -21,7 +21,10 @@ from typing import Dict, List, Optional, Union, cast
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
SqlAlchemyDatasourceBatchSpec,
)
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
)
@ -123,7 +126,9 @@ class OpenMetadataValidationAction(ValidationAction):
expectation_suite_identifier: type of expectation suite
checkpoint_identifier: identifier for the checkpoint
"""
check_point_spec = self._get_checkpoint_batch_spec(data_asset)
if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec):
execution_engine_url = self._get_execution_engine_url(data_asset)
table_entity = self._get_table_entity(
execution_engine_url.database
@ -133,11 +138,32 @@ class OpenMetadataValidationAction(ValidationAction):
check_point_spec.get("table_name"),
)
elif isinstance(check_point_spec, RuntimeDataBatchSpec):
table_name = self._get_metadata_from_validation_suite(
validation_result_suite
)
table_entity = self._get_table_entity(
self.database_name,
self.schema_name,
table_name,
)
if table_entity:
test_suite = self._check_or_create_test_suite(table_entity)
for result in validation_result_suite.results:
self._handle_test_case(result, table_entity, test_suite)
def _get_metadata_from_validation_suite(self, validation_result_suite: dict) -> str:
# table_name_1, split on last "_" in the case there are multiple suites for one schema
try:
name = validation_result_suite["meta"]["expectation_suite_name"]
splitted_name = name.rpartition("_")
table_name = splitted_name[0]
return table_name
except KeyError:
raise KeyError("No suite name present in validation_result_suite")
@staticmethod
def _get_checkpoint_batch_spec(
data_asset: Union[Validator, DataAsset, Batch]
@ -154,9 +180,11 @@ class OpenMetadataValidationAction(ValidationAction):
batch_spec = data_asset.active_batch_spec
if isinstance(batch_spec, SqlAlchemyDatasourceBatchSpec):
return batch_spec
if isinstance(batch_spec, RuntimeDataBatchSpec):
return batch_spec
raise ValueError(
f"Type `{type(batch_spec).__name__,}` is not supported."
" Make sur you ran your expectations against a relational database",
" Make sure you ran your expectations against a relational database",
)
def _get_table_entity(