Add indexing pipeline type (#3461)

This commit is contained in:
Vladimir Blagojevic 2022-10-24 17:26:15 +02:00 committed by GitHub
parent 9b931bbf66
commit 1b9586ae40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -571,7 +571,7 @@ class Pipeline:
i = 0 i = 0
else: else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
self.send_pipeline_event_if_needed() self.send_pipeline_event_if_needed(is_indexing=file_paths is not None)
return node_output return node_output
@pipeline_invocation_counter @pipeline_invocation_counter
@ -2237,14 +2237,14 @@ class Pipeline:
""" """
return datetime.datetime.now(datetime.timezone.utc) - self.init_time return datetime.datetime.now(datetime.timezone.utc) - self.init_time
def send_pipeline_event(self): def send_pipeline_event(self, is_indexing: bool = False):
fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest() fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest()
run_total = self.run.counter + self.run_batch.counter run_total = self.run.counter + self.run_batch.counter
send_custom_event( send_custom_event(
"pipeline", "pipeline",
payload={ payload={
"fingerprint": fingerprint, "fingerprint": fingerprint,
"type": self.get_type(), "type": "Indexing" if is_indexing else self.get_type(),
"uptime": int(self.uptime().total_seconds()), "uptime": int(self.uptime().total_seconds()),
"run_total": run_total, "run_total": run_total,
"run_total_window": run_total - self.last_window_run_total, "run_total_window": run_total - self.last_window_run_total,
@ -2254,10 +2254,10 @@ class Pipeline:
self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc) self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc)
self.last_window_run_total = run_total self.last_window_run_total = run_total
def send_pipeline_event_if_needed(self): def send_pipeline_event_if_needed(self, is_indexing: bool = False):
should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded() should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded()
if should_send_event and not self.sent_event_in_window: if should_send_event and not self.sent_event_in_window:
self.send_pipeline_event() self.send_pipeline_event(is_indexing)
self.sent_event_in_window = True self.sent_event_in_window = True
elif self.has_event_time_interval_exceeded(): elif self.has_event_time_interval_exceeded():
self.sent_event_in_window = False self.sent_event_in_window = False