fix: add table_name arg. for runtime batch (#15078)

This commit is contained in:
Teddy 2024-02-07 13:10:27 +01:00 committed by GitHub
parent db7759810b
commit 11b82d4eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 13 additions and 18 deletions

View File

@ -97,10 +97,12 @@ class OpenMetadataValidationAction(ValidationAction):
database_service_name: Optional[str] = None,
schema_name: Optional[str] = "default",
database_name: Optional[str] = None,
table_name: Optional[str] = None,
):
super().__init__(data_context)
self.database_service_name = database_service_name
self.database_name = database_name
self.table_name = table_name
self.schema_name = schema_name # for database without schema concept
self.config_file_path = config_file_path
self.ometa_conn = self._create_ometa_connection()
@ -128,6 +130,7 @@ class OpenMetadataValidationAction(ValidationAction):
"""
check_point_spec = self._get_checkpoint_batch_spec(data_asset)
table_entity = None
if isinstance(check_point_spec, SqlAlchemyDatasourceBatchSpec):
execution_engine_url = self._get_execution_engine_url(data_asset)
table_entity = self._get_table_entity(
@ -139,13 +142,10 @@ class OpenMetadataValidationAction(ValidationAction):
)
elif isinstance(check_point_spec, RuntimeDataBatchSpec):
table_name = self._get_metadata_from_validation_suite(
validation_result_suite
)
table_entity = self._get_table_entity(
self.database_name,
self.schema_name,
table_name,
self.table_name,
)
if table_entity:
@ -153,17 +153,6 @@ class OpenMetadataValidationAction(ValidationAction):
for result in validation_result_suite.results:
self._handle_test_case(result, table_entity, test_suite)
def _get_metadata_from_validation_suite(self, validation_result_suite: dict) -> str:
# table_name_1, split on last "_" in the case there are multiple suites for one schema
try:
name = validation_result_suite["meta"]["expectation_suite_name"]
splitted_name = name.rpartition("_")
table_name = splitted_name[0]
return table_name
except KeyError:
raise KeyError("No suite name present in validation_result_suite")
@staticmethod
def _get_checkpoint_batch_spec(
data_asset: Union[Validator, DataAsset, Batch]

View File

@ -32,7 +32,10 @@ action:
module_name: metadata.great_expectations.action
class_name: OpenMetadataValidationAction
config_file_path: path/to/ometa/config/file/
ometa_service_name: my_service_name
database_service_name: <serviceName in OM>
database_name: <databaseName in OM>
schema_name: <schemaName in OM>
table_name: <tableName in OM>
[...]
```
@ -43,7 +46,10 @@ In your checkpoint yaml file, you will need to add the above code block in `acti
- `module_name`: this is OpenMetadata submodule name
- `class_name`: this is the name of the class that will be used to execute the custom action
- `config_file_path`: this is the path to your `config.yaml` file that holds the configuration of your OpenMetadata server
- `ometa_service_name`: [Optional] this is an optional parameter. If not specified and 2 tables have the same name in 2 different OpenMetadata services, the custom action will fail
- `database_service_name`: [Optional] this is an optional parameter. If not specified and 2 tables have the same name in 2 different OpenMetadata services, the custom action will fail
- `database_name`: [Optional] only required for `RuntimeDataBatchSpec` execution (e.g. run GX against a dataframe).
- `schema_name`: [Optional] only required for `RuntimeDataBatchSpec` execution (e.g. run GX against a dataframe).
- `table_name`: [Optional] only required for `RuntimeDataBatchSpec` execution (e.g. run GX against a dataframe).
{% image
src={"/images/v1.3/features/integrations/ge-checkpoint-file.gif"}
@ -66,7 +72,7 @@ data_context.run_checkpoint(
"module_name": "metadata.great_expectations.action",
"class_name": "OpenMetadataValidationAction",
"config_file_path": "path/to/ometa/config/file/",
"ometa_service_name": "my_service_name",
"database_service_name": "my_service_name",
},
,}
)