diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index d03e8b76c..2e20097ed 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -571,7 +571,7 @@ class Pipeline: i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - self.send_pipeline_event_if_needed() + self.send_pipeline_event_if_needed(is_indexing=file_paths is not None) return node_output @pipeline_invocation_counter @@ -2237,14 +2237,14 @@ class Pipeline: """ return datetime.datetime.now(datetime.timezone.utc) - self.init_time - def send_pipeline_event(self): + def send_pipeline_event(self, is_indexing: bool = False): fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest() run_total = self.run.counter + self.run_batch.counter send_custom_event( "pipeline", payload={ "fingerprint": fingerprint, - "type": self.get_type(), + "type": "Indexing" if is_indexing else self.get_type(), "uptime": int(self.uptime().total_seconds()), "run_total": run_total, "run_total_window": run_total - self.last_window_run_total, @@ -2254,10 +2254,10 @@ class Pipeline: self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc) self.last_window_run_total = run_total - def send_pipeline_event_if_needed(self): + def send_pipeline_event_if_needed(self, is_indexing: bool = False): should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded() if should_send_event and not self.sent_event_in_window: - self.send_pipeline_event() + self.send_pipeline_event(is_indexing) self.sent_event_in_window = True elif self.has_event_time_interval_exceeded(): self.sent_event_in_window = False