fix: catch not Either type in workflow and return explicit error message (#13796)

This commit is contained in:
Teddy 2023-11-02 13:02:26 +01:00 committed by GitHub
parent 8375f6e7a9
commit d025e217d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 0 deletions

View File

@ -11,6 +11,7 @@
"""
Each of the ingestion steps: Source, Sink, Stage,...
"""
import inspect
import traceback
from abc import ABC, abstractmethod
from typing import Iterable, Optional
@ -79,6 +80,19 @@ class ReturnStep(Step, ABC):
except WorkflowFatalError as err:
logger.error(f"Fatal error running step [{self}]: [{err}]")
raise err
except AttributeError as exc:
error = (
f"Object type defined in `def _run()` "
f"{inspect.getsourcefile(self._run)} is not an Either: [{exc}]"
)
logger.warning(error)
self.status.failed(
StackTraceError(
name="Not an Either",
error=error,
stack_trace=traceback.format_exc(),
)
)
except Exception as exc:
error = f"Unhandled exception during workflow processing: [{exc}]"
logger.warning(error)
@ -121,6 +135,19 @@ class StageStep(Step, ABC):
except WorkflowFatalError as err:
logger.error(f"Fatal error running step [{self}]: [{err}]")
raise err
except AttributeError as exc:
error = (
f"Object type defined in `def _run()` "
f"{inspect.getsourcefile(self._run)} is not an Either: [{exc}]"
)
logger.warning(error)
self.status.failed(
StackTraceError(
name="Not an Either",
error=error,
stack_trace=traceback.format_exc(),
)
)
except Exception as exc:
error = f"Unhandled exception during workflow processing: [{exc}]"
logger.warning(error)
@ -157,6 +184,19 @@ class IterStep(Step, ABC):
except WorkflowFatalError as err:
logger.error(f"Fatal error running step [{self}]: [{err}]")
raise err
except AttributeError as exc:
error = (
f"Object type defined in `def _iter()` "
f"{inspect.getsourcefile(self._iter)} is not an Either: [{exc}]"
)
logger.warning(error)
self.status.failed(
StackTraceError(
name="Not an Either",
error=error,
stack_trace=traceback.format_exc(),
)
)
except Exception as exc:
error = f"Encountered exception running step [{self}]: [{exc}]"
logger.warning(error)

View File

@ -62,6 +62,27 @@ class SimpleSource(WorkflowSource):
yield Either(right=element)
class BrokenSource(WorkflowSource):
"""Source not returning an Either"""
def prepare(self):
"""Nothing to do"""
def test_connection(self) -> None:
"""Nothing to do"""
@classmethod
def create(cls, _: dict, __: OpenMetadataConnection) -> "SimpleSource":
return cls()
def close(self) -> None:
"""Nothing to do"""
def _iter(self, *args, **kwargs) -> Iterable[int]:
for element in range(0, 5):
yield int(element)
class SimpleSink(Sink):
"""
Simple Sink for testing
@ -94,6 +115,17 @@ class SimpleWorkflow(BaseWorkflow):
self.steps: Tuple[Step] = (SimpleSink(),)
class BrokenWorkflow(BaseWorkflow):
"""
Simple Workflow for testing
"""
def set_steps(self):
self.source = BrokenSource()
self.steps: Tuple[Step] = (SimpleSink(),)
# Pass only the required details so that the workflow can be initialized
config = OpenMetadataWorkflowConfig(
source=Source(
@ -119,6 +151,7 @@ class TestBaseWorkflow(TestCase):
"""
workflow = SimpleWorkflow(config=config)
broken_workflow = BrokenWorkflow(config=config)
@pytest.mark.order(1)
def test_workflow_executes(self):
@ -143,3 +176,17 @@ class TestBaseWorkflow(TestCase):
def test_workflow_raise_status(self):
# We catch the error on the Sink
self.assertRaises(WorkflowExecutionError, self.workflow.raise_from_status)
def test_broken_workflow(self):
"""test our broken workflow return expected exc"""
self.broken_workflow.execute()
self.assertRaises(
WorkflowExecutionError, self.broken_workflow.raise_from_status
)
self.assertEqual(
self.broken_workflow.source.status.failures[0].name, "Not an Either"
)
assert (
"workflow/test_base_workflow.py"
in self.broken_workflow.source.status.failures[0].error
)