From 79c6063ac268e8db8671220615e44cf4d2bb1785 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Thu, 20 Oct 2022 16:02:45 +0200 Subject: [PATCH] feat: send event if number of queries exceeds threshold (#3419) Co-authored-by: Julian Risch --- haystack/pipelines/base.py | 41 +++++++++++++++++++++++++++----------- haystack/telemetry.py | 1 + 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 1803c33df..2cf93a55c 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -73,8 +73,11 @@ class Pipeline: def __init__(self): self.graph = DiGraph() self.init_time = datetime.datetime.now(datetime.timezone.utc) - self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc) - self.telemetry_update_interval = datetime.timedelta(hours=24) + self.time_of_last_sent_event = datetime.datetime.now(datetime.timezone.utc) + self.event_time_interval = datetime.timedelta(hours=24) + self.event_run_total_threshold = 100 + self.last_window_run_total = 0 + self.sent_event_in_window = False @property def root_node(self) -> Optional[str]: @@ -567,8 +570,7 @@ class Pipeline: i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - if self.should_send_telemetry(): - self.send_telemetry() + self.send_pipeline_event_if_needed() return node_output @pipeline_invocation_counter @@ -719,8 +721,7 @@ class Pipeline: i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - if self.should_send_telemetry(): - self.send_telemetry() + self.send_pipeline_event_if_needed() return node_output @classmethod @@ -2210,22 +2211,38 @@ class Pipeline: """ return datetime.datetime.now(datetime.timezone.utc) - self.init_time - def send_telemetry(self): + def send_pipeline_event(self): fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest() + run_total = self.run.counter + self.run_batch.counter send_custom_event( "pipeline", payload={ "fingerprint": fingerprint, "type": self.get_type(), "uptime": int(self.uptime().total_seconds()), - "run_total": self.run.counter + self.run_batch.counter, + "run_total": run_total, + "run_total_window": run_total - self.last_window_run_total, }, ) - self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc) - - def should_send_telemetry(self): now = datetime.datetime.now(datetime.timezone.utc) - return now - self.last_telemetry_update > self.telemetry_update_interval + self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc) + self.last_window_run_total = run_total + + def send_pipeline_event_if_needed(self): + should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded() + if should_send_event and not self.sent_event_in_window: + self.send_pipeline_event() + self.sent_event_in_window = True + elif self.has_event_time_interval_exceeded(): + self.sent_event_in_window = False + + def has_event_time_interval_exceeded(self): + now = datetime.datetime.now(datetime.timezone.utc) + return now - self.time_of_last_sent_event > self.event_time_interval + + def has_event_run_total_threshold_exceeded(self): + run_total = self.run.counter + self.run_batch.counter + return run_total - self.last_window_run_total > self.event_run_total_threshold class _HaystackBeirRetrieverAdapter: diff --git a/haystack/telemetry.py b/haystack/telemetry.py index 01d2e82d7..70fe9a0c0 100644 --- a/haystack/telemetry.py +++ b/haystack/telemetry.py @@ -291,6 +291,7 @@ class NonPrivateParameters: "type", "uptime", "run_total", + "run_total_window", ] @classmethod