From d025e217d686f0b7b711be746523467a92e91aca Mon Sep 17 00:00:00 2001 From: Teddy Date: Thu, 2 Nov 2023 13:02:26 +0100 Subject: [PATCH] fix: catch not Either type in workflow and return explicit error message (#13796) --- ingestion/src/metadata/ingestion/api/step.py | 40 ++++++++++++++++ .../tests/unit/workflow/test_base_workflow.py | 47 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/ingestion/src/metadata/ingestion/api/step.py b/ingestion/src/metadata/ingestion/api/step.py index 85c352bcf05..b7f94dd7b2e 100644 --- a/ingestion/src/metadata/ingestion/api/step.py +++ b/ingestion/src/metadata/ingestion/api/step.py @@ -11,6 +11,7 @@ """ Each of the ingestion steps: Source, Sink, Stage,... """ +import inspect import traceback from abc import ABC, abstractmethod from typing import Iterable, Optional @@ -79,6 +80,19 @@ class ReturnStep(Step, ABC): except WorkflowFatalError as err: logger.error(f"Fatal error running step [{self}]: [{err}]") raise err + except AttributeError as exc: + error = ( + f"Object type defined in `def _run()` " + f"{inspect.getsourcefile(self._run)} is not an Either: [{exc}]" + ) + logger.warning(error) + self.status.failed( + StackTraceError( + name="Not an Either", + error=error, + stack_trace=traceback.format_exc(), + ) + ) except Exception as exc: error = f"Unhandled exception during workflow processing: [{exc}]" logger.warning(error) @@ -121,6 +135,19 @@ class StageStep(Step, ABC): except WorkflowFatalError as err: logger.error(f"Fatal error running step [{self}]: [{err}]") raise err + except AttributeError as exc: + error = ( + f"Object type defined in `def _run()` " + f"{inspect.getsourcefile(self._run)} is not an Either: [{exc}]" + ) + logger.warning(error) + self.status.failed( + StackTraceError( + name="Not an Either", + error=error, + stack_trace=traceback.format_exc(), + ) + ) except Exception as exc: error = f"Unhandled exception during workflow processing: [{exc}]" logger.warning(error) @@ -157,6 +184,19 @@ class IterStep(Step, ABC): except WorkflowFatalError as err: logger.error(f"Fatal error running step [{self}]: [{err}]") raise err + except AttributeError as exc: + error = ( + f"Object type defined in `def _iter()` " + f"{inspect.getsourcefile(self._iter)} is not an Either: [{exc}]" + ) + logger.warning(error) + self.status.failed( + StackTraceError( + name="Not an Either", + error=error, + stack_trace=traceback.format_exc(), + ) + ) except Exception as exc: error = f"Encountered exception running step [{self}]: [{exc}]" logger.warning(error) diff --git a/ingestion/tests/unit/workflow/test_base_workflow.py b/ingestion/tests/unit/workflow/test_base_workflow.py index 604816349c8..06c844fdaa9 100644 --- a/ingestion/tests/unit/workflow/test_base_workflow.py +++ b/ingestion/tests/unit/workflow/test_base_workflow.py @@ -62,6 +62,27 @@ class SimpleSource(WorkflowSource): yield Either(right=element) +class BrokenSource(WorkflowSource): + """Source not returning an Either""" + + def prepare(self): + """Nothing to do""" + + def test_connection(self) -> None: + """Nothing to do""" + + @classmethod + def create(cls, _: dict, __: OpenMetadataConnection) -> "SimpleSource": + return cls() + + def close(self) -> None: + """Nothing to do""" + + def _iter(self, *args, **kwargs) -> Iterable[int]: + for element in range(0, 5): + yield int(element) + + class SimpleSink(Sink): """ Simple Sink for testing @@ -94,6 +115,17 @@ class SimpleWorkflow(BaseWorkflow): self.steps: Tuple[Step] = (SimpleSink(),) +class BrokenWorkflow(BaseWorkflow): + """ + Simple Workflow for testing + """ + + def set_steps(self): + self.source = BrokenSource() + + self.steps: Tuple[Step] = (SimpleSink(),) + + # Pass only the required details so that the workflow can be initialized config = OpenMetadataWorkflowConfig( source=Source( @@ -119,6 +151,7 @@ class TestBaseWorkflow(TestCase): """ workflow = SimpleWorkflow(config=config) + broken_workflow = BrokenWorkflow(config=config) @pytest.mark.order(1) def test_workflow_executes(self): @@ -143,3 +176,17 @@ class TestBaseWorkflow(TestCase): def test_workflow_raise_status(self): # We catch the error on the Sink self.assertRaises(WorkflowExecutionError, self.workflow.raise_from_status) + + def test_broken_workflow(self): + """test our broken workflow return expected exc""" + self.broken_workflow.execute() + self.assertRaises( + WorkflowExecutionError, self.broken_workflow.raise_from_status + ) + self.assertEqual( + self.broken_workflow.source.status.failures[0].name, "Not an Either" + ) + assert ( + "workflow/test_base_workflow.py" + in self.broken_workflow.source.status.failures[0].error + )