Fixes #10096: adding RuntimeDataBatchSpec as a GX backend (#13264)

* test

* support different gx backend

* remove unused import

* fix formatting

* Update ingestion/src/metadata/great_expectations/action.py

Co-authored-by: Teddy <teddy.crepineau@gmail.com>

---------

Co-authored-by: William Geuns <william.geuns@vrt.be>
Co-authored-by: Teddy <teddy.crepineau@gmail.com>
This commit is contained in:
William Geuns 2023-09-21 21:55:29 +02:00 committed by GitHub
parent 6417dff1c4
commit 270a923eb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -21,7 +21,10 @@ from typing import Dict, List, Optional, Union, cast
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
SqlAlchemyDatasourceBatchSpec,
)
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
)
@ -123,21 +126,44 @@ class OpenMetadataValidationAction(ValidationAction):
expectation_suite_identifier: type of expectation suite
checkpoint_identifier: identifier for the checkpoint
"""
check_point_spec = self._get_checkpoint_batch_spec(data_asset)
execution_engine_url = self._get_execution_engine_url(data_asset)
table_entity = self._get_table_entity(
execution_engine_url.database
if not self.database_name
else self.database_name,
check_point_spec.get("schema_name", self.schema_name),
check_point_spec.get("table_name"),
)
if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec):
execution_engine_url = self._get_execution_engine_url(data_asset)
table_entity = self._get_table_entity(
execution_engine_url.database
if not self.database_name
else self.database_name,
check_point_spec.get("schema_name", self.schema_name),
check_point_spec.get("table_name"),
)
elif isinstance(check_point_spec, RuntimeDataBatchSpec):
table_name = self._get_metadata_from_validation_suite(
validation_result_suite
)
table_entity = self._get_table_entity(
self.database_name,
self.schema_name,
table_name,
)
if table_entity:
test_suite = self._check_or_create_test_suite(table_entity)
for result in validation_result_suite.results:
self._handle_test_case(result, table_entity, test_suite)
def _get_metadata_from_validation_suite(self, validation_result_suite: dict) -> str:
# table_name_1, split on last "_" in the case there are multiple suites for one schema
try:
name = validation_result_suite["meta"]["expectation_suite_name"]
splitted_name = name.rpartition("_")
table_name = splitted_name[0]
return table_name
except KeyError:
raise KeyError("No suite name present in validation_result_suite")
@staticmethod
def _get_checkpoint_batch_spec(
data_asset: Union[Validator, DataAsset, Batch]
@ -154,9 +180,11 @@ class OpenMetadataValidationAction(ValidationAction):
batch_spec = data_asset.active_batch_spec
if isinstance(batch_spec, SqlAlchemyDatasourceBatchSpec):
return batch_spec
if isinstance(batch_spec, RuntimeDataBatchSpec):
return batch_spec
raise ValueError(
f"Type `{type(batch_spec).__name__,}` is not supported."
" Make sur you ran your expectations against a relational database",
" Make sure you ran your expectations against a relational database",
)
def _get_table_entity(