mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-09-26 00:24:14 +00:00
feat: send event if number of queries exceeds threshold (#3419)
Co-authored-by: Julian Risch <julian.risch@deepset.ai>
This commit is contained in:
parent
3f956c75f4
commit
79c6063ac2
@ -73,8 +73,11 @@ class Pipeline:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.graph = DiGraph()
|
self.graph = DiGraph()
|
||||||
self.init_time = datetime.datetime.now(datetime.timezone.utc)
|
self.init_time = datetime.datetime.now(datetime.timezone.utc)
|
||||||
self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc)
|
self.time_of_last_sent_event = datetime.datetime.now(datetime.timezone.utc)
|
||||||
self.telemetry_update_interval = datetime.timedelta(hours=24)
|
self.event_time_interval = datetime.timedelta(hours=24)
|
||||||
|
self.event_run_total_threshold = 100
|
||||||
|
self.last_window_run_total = 0
|
||||||
|
self.sent_event_in_window = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def root_node(self) -> Optional[str]:
|
def root_node(self) -> Optional[str]:
|
||||||
@ -567,8 +570,7 @@ class Pipeline:
|
|||||||
i = 0
|
i = 0
|
||||||
else:
|
else:
|
||||||
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
|
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
|
||||||
if self.should_send_telemetry():
|
self.send_pipeline_event_if_needed()
|
||||||
self.send_telemetry()
|
|
||||||
return node_output
|
return node_output
|
||||||
|
|
||||||
@pipeline_invocation_counter
|
@pipeline_invocation_counter
|
||||||
@ -719,8 +721,7 @@ class Pipeline:
|
|||||||
i = 0
|
i = 0
|
||||||
else:
|
else:
|
||||||
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
|
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
|
||||||
if self.should_send_telemetry():
|
self.send_pipeline_event_if_needed()
|
||||||
self.send_telemetry()
|
|
||||||
return node_output
|
return node_output
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -2210,22 +2211,38 @@ class Pipeline:
|
|||||||
"""
|
"""
|
||||||
return datetime.datetime.now(datetime.timezone.utc) - self.init_time
|
return datetime.datetime.now(datetime.timezone.utc) - self.init_time
|
||||||
|
|
||||||
def send_telemetry(self):
|
def send_pipeline_event(self):
|
||||||
fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest()
|
fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest()
|
||||||
|
run_total = self.run.counter + self.run_batch.counter
|
||||||
send_custom_event(
|
send_custom_event(
|
||||||
"pipeline",
|
"pipeline",
|
||||||
payload={
|
payload={
|
||||||
"fingerprint": fingerprint,
|
"fingerprint": fingerprint,
|
||||||
"type": self.get_type(),
|
"type": self.get_type(),
|
||||||
"uptime": int(self.uptime().total_seconds()),
|
"uptime": int(self.uptime().total_seconds()),
|
||||||
"run_total": self.run.counter + self.run_batch.counter,
|
"run_total": run_total,
|
||||||
|
"run_total_window": run_total - self.last_window_run_total,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
self.last_telemetry_update = datetime.datetime.now(datetime.timezone.utc)
|
|
||||||
|
|
||||||
def should_send_telemetry(self):
|
|
||||||
now = datetime.datetime.now(datetime.timezone.utc)
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
return now - self.last_telemetry_update > self.telemetry_update_interval
|
self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc)
|
||||||
|
self.last_window_run_total = run_total
|
||||||
|
|
||||||
|
def send_pipeline_event_if_needed(self):
|
||||||
|
should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded()
|
||||||
|
if should_send_event and not self.sent_event_in_window:
|
||||||
|
self.send_pipeline_event()
|
||||||
|
self.sent_event_in_window = True
|
||||||
|
elif self.has_event_time_interval_exceeded():
|
||||||
|
self.sent_event_in_window = False
|
||||||
|
|
||||||
|
def has_event_time_interval_exceeded(self):
|
||||||
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
return now - self.time_of_last_sent_event > self.event_time_interval
|
||||||
|
|
||||||
|
def has_event_run_total_threshold_exceeded(self):
|
||||||
|
run_total = self.run.counter + self.run_batch.counter
|
||||||
|
return run_total - self.last_window_run_total > self.event_run_total_threshold
|
||||||
|
|
||||||
|
|
||||||
class _HaystackBeirRetrieverAdapter:
|
class _HaystackBeirRetrieverAdapter:
|
||||||
|
@ -291,6 +291,7 @@ class NonPrivateParameters:
|
|||||||
"type",
|
"type",
|
||||||
"uptime",
|
"uptime",
|
||||||
"run_total",
|
"run_total",
|
||||||
|
"run_total_window",
|
||||||
]
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
Loading…
x
Reference in New Issue
Block a user