Fix Usage from Bulk Sink (#3354)

* Fix Usage from Bulk Sink

* Closing stage in worklow once bulk sink starts
This commit is contained in:
Ayush Shah 2022-03-10 17:12:42 +05:30 committed by GitHub
parent 794cf64787
commit 4f1ef18c2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -163,14 +163,13 @@ class Workflow:
self.sink.write_record(processed_record)
self.report["sink"] = self.sink.get_status().as_obj()
if hasattr(self, "bulk_sink"):
self.stage.close()
self.bulk_sink.write_records()
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj()
def stop(self):
if hasattr(self, "processor"):
self.processor.close()
if hasattr(self, "stage"):
self.stage.close()
if hasattr(self, "bulk_sink"):
self.bulk_sink.close()
if hasattr(self, "sink"):