diff --git a/ingestion/src/metadata/great_expectations/action.py b/ingestion/src/metadata/great_expectations/action.py index 1f189182dbc..89bce7f0aaf 100644 --- a/ingestion/src/metadata/great_expectations/action.py +++ b/ingestion/src/metadata/great_expectations/action.py @@ -21,7 +21,10 @@ from typing import Dict, List, Optional, Union, cast from great_expectations.checkpoint.actions import ValidationAction from great_expectations.core.batch import Batch -from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec +from great_expectations.core.batch_spec import ( + RuntimeDataBatchSpec, + SqlAlchemyDatasourceBatchSpec, +) from great_expectations.core.expectation_validation_result import ( ExpectationSuiteValidationResult, ) @@ -123,21 +126,44 @@ class OpenMetadataValidationAction(ValidationAction): expectation_suite_identifier: type of expectation suite checkpoint_identifier: identifier for the checkpoint """ + check_point_spec = self._get_checkpoint_batch_spec(data_asset) - execution_engine_url = self._get_execution_engine_url(data_asset) - table_entity = self._get_table_entity( - execution_engine_url.database - if not self.database_name - else self.database_name, - check_point_spec.get("schema_name", self.schema_name), - check_point_spec.get("table_name"), - ) + if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec): + execution_engine_url = self._get_execution_engine_url(data_asset) + table_entity = self._get_table_entity( + execution_engine_url.database + if not self.database_name + else self.database_name, + check_point_spec.get("schema_name", self.schema_name), + check_point_spec.get("table_name"), + ) + + elif isinstance(check_point_spec, RuntimeDataBatchSpec): + table_name = self._get_metadata_from_validation_suite( + validation_result_suite + ) + table_entity = self._get_table_entity( + self.database_name, + self.schema_name, + table_name, + ) if table_entity: test_suite = self._check_or_create_test_suite(table_entity) for result in validation_result_suite.results: self._handle_test_case(result, table_entity, test_suite) + def _get_metadata_from_validation_suite(self, validation_result_suite: dict) -> str: + # table_name_1, split on last "_" in the case there are multiple suites for one schema + try: + name = validation_result_suite["meta"]["expectation_suite_name"] + splitted_name = name.rpartition("_") + table_name = splitted_name[0] + return table_name + + except KeyError: + raise KeyError("No suite name present in validation_result_suite") + @staticmethod def _get_checkpoint_batch_spec( data_asset: Union[Validator, DataAsset, Batch] @@ -154,9 +180,11 @@ class OpenMetadataValidationAction(ValidationAction): batch_spec = data_asset.active_batch_spec if isinstance(batch_spec, SqlAlchemyDatasourceBatchSpec): return batch_spec + if isinstance(batch_spec, RuntimeDataBatchSpec): + return batch_spec raise ValueError( f"Type `{type(batch_spec).__name__,}` is not supported." - " Make sur you ran your expectations against a relational database", + " Make sure you ran your expectations against a relational database", ) def _get_table_entity(