diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index aae82a64c25..8b725ab95f9 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -102,9 +102,27 @@ class TopologyRunnerMixin(Generic[C]): if node.post_process: logger.debug(f"Post processing node {node}") for process in node.post_process: - node_post_process = getattr(self, process) - for entity_request in node_post_process(): - yield entity_request + try: + yield from self.check_context_and_handle(process) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not run Post Process `{process}` from Topology Runner -- {exc}" + ) + + def check_context_and_handle(self, post_process: str): + """Based on the post_process step, check context and + evaluate if we can run it based on available class attributes + + Args: + post_process: the name of the post_process step + """ + if post_process == "mark_tables_as_deleted" and not self.context.database: + raise ValueError("No Database found in `self.context`") + + node_post_process = getattr(self, post_process) + for entity_request in node_post_process(): + yield entity_request def next_record(self) -> Iterable[Entity]: """