From 4f1ef18c2b838f33a68b0a38e8bf133a775a50c5 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 10 Mar 2022 17:12:42 +0530 Subject: [PATCH] Fix Usage from Bulk Sink (#3354) * Fix Usage from Bulk Sink * Closing stage in worklow once bulk sink starts --- ingestion/src/metadata/ingestion/api/workflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 8caf7fde797..ff3e939da7d 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -163,14 +163,13 @@ class Workflow: self.sink.write_record(processed_record) self.report["sink"] = self.sink.get_status().as_obj() if hasattr(self, "bulk_sink"): + self.stage.close() self.bulk_sink.write_records() self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj() def stop(self): if hasattr(self, "processor"): self.processor.close() - if hasattr(self, "stage"): - self.stage.close() if hasattr(self, "bulk_sink"): self.bulk_sink.close() if hasattr(self, "sink"):