From bdf85d0eae1e5a31d7be8b638f31ea10eb85eeca Mon Sep 17 00:00:00 2001 From: Teddy Date: Sat, 24 Sep 2022 00:47:53 +0200 Subject: [PATCH] Added logic to handle missing attributes from the context (#7696) --- .../metadata/ingestion/api/topology_runner.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index aae82a64c25..8b725ab95f9 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -102,9 +102,27 @@ class TopologyRunnerMixin(Generic[C]): if node.post_process: logger.debug(f"Post processing node {node}") for process in node.post_process: - node_post_process = getattr(self, process) - for entity_request in node_post_process(): - yield entity_request + try: + yield from self.check_context_and_handle(process) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not run Post Process `{process}` from Topology Runner -- {exc}" + ) + + def check_context_and_handle(self, post_process: str): + """Based on the post_process step, check context and + evaluate if we can run it based on available class attributes + + Args: + post_process: the name of the post_process step + """ + if post_process == "mark_tables_as_deleted" and not self.context.database: + raise ValueError("No Database found in `self.context`") + + node_post_process = getattr(self, post_process) + for entity_request in node_post_process(): + yield entity_request def next_record(self) -> Iterable[Entity]: """