diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 8caf7fde797..ff3e939da7d 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -163,14 +163,13 @@ class Workflow: self.sink.write_record(processed_record) self.report["sink"] = self.sink.get_status().as_obj() if hasattr(self, "bulk_sink"): + self.stage.close() self.bulk_sink.write_records() self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() def stop(self): if hasattr(self, "processor"): self.processor.close() - if hasattr(self, "stage"): - self.stage.close() if hasattr(self, "bulk_sink"): self.bulk_sink.close() if hasattr(self, "sink"):