Handle internal runtime errors when validating workflows (#7049)

This commit is contained in:
Pere Miquel Brull 2022-08-31 07:55:40 +02:00 committed by GitHub
parent 7caac4616f
commit 3c95423301
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -88,6 +88,12 @@ T = TypeVar("T", bound=BaseModel)
HAS_INNER_CONNECTION = {"Airflow"}
class InvalidWorkflowException(Exception):
"""
Raise when encountering errors with the workflow configuration
"""
def get_service_type(
source_type: str,
) -> Union[
@ -186,6 +192,17 @@ def get_connection_class(
return connection_class
def _parse_validation_err(validation_error: ValidationError) -> str:
"""
Convert the validation error into a message to log
"""
errors = [
f"{err.get('msg')} in {err.get('loc')}" for err in validation_error.errors()
]
return "\n".join(errors)
def _unsafe_parse_config(config: dict, cls: T, message: str) -> None:
"""
Given a config dictionary and the class it should match,
@ -198,7 +215,7 @@ def _unsafe_parse_config(config: dict, cls: T, message: str) -> None:
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.error(
f"The supported properties for {cls.__name__} are {list(cls.__fields__.keys())}: {err}"
f"The supported properties for {cls.__name__} are {list(cls.__fields__.keys())}"
)
raise err
@ -230,7 +247,9 @@ def parse_service_connection(config_dict: dict) -> None:
:param config_dict: JSON configuration
"""
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
source_type = config_dict["source"]["serviceConnection"]["config"]["type"]
source_type = config_dict["source"]["serviceConnection"]["config"].get("type")
if source_type is None:
raise InvalidWorkflowException("Missing type in the serviceConnection config")
logger.warning(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
@ -259,7 +278,11 @@ def parse_source_config(config_dict: dict) -> None:
:param config_dict: JSON configuration
"""
# Parse the source config
source_config_type = config_dict["source"]["sourceConfig"]["config"]["type"]
source_config_type = config_dict["source"]["sourceConfig"]["config"].get("type")
if source_config_type is None:
raise InvalidWorkflowException("Missing type in the sourceConfig config")
source_config_class = get_source_config_class(source_config_type)
_unsafe_parse_config(
@ -322,6 +345,13 @@ def parse_workflow_config_gracefully(
throws a scoped error while fetching the required source connection
class.
If there is a validation error, two things can happen:
- We find out the ValidationError scoping the search to serviceConnection, sourceConfig or securityConfig
- There is something strange going on with the config, and we find another wild Exception.
Therefore, we first need to catch any ValidationError and raise that immediately (this is the expected case).
Otherwise, we throw a message and raise the original ValidationError to point to the root cause.
:param config_dict: JSON workflow config
:return:workflow config or scoped error
"""
@ -330,9 +360,21 @@ def parse_workflow_config_gracefully(
workflow_config = OpenMetadataWorkflowConfig.parse_obj(config_dict)
return workflow_config
except ValidationError:
parse_workflow_source(config_dict)
parse_server_config(config_dict)
except ValidationError as original_error:
try:
parse_workflow_source(config_dict)
parse_server_config(config_dict)
except (ValidationError, InvalidWorkflowException) as scoped_error:
raise scoped_error
except Exception as runtime_error:
logger.debug(traceback.format_exc())
logger.error(
f"We encountered an error {runtime_error} when trying to verify the serviceConnection,"
" sourceConfig and securityConfig parameters in your workflow.\n"
" You might need to review your config based on the original cause of this failure:"
f" [{_parse_validation_err(original_error)}]"
)
raise original_error
def parse_test_connection_request_gracefully(