feat: Report execution time for pipeline components in _debug (#4197)

* Adding execution time to the debug output of pipeline components

* Linting issue fix

* [EMPTY] Re-trigger CI

* fixed test

---------

Co-authored-by: Mayank Jobanputra <mayankjobanputra@gmail.com>
This commit is contained in:
Zoltan Fedor 2023-03-06 18:15:31 -05:00 committed by GitHub
parent 19311119db
commit 4dea9db01e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import itertools
from datetime import timedelta from datetime import timedelta
from functools import partial from functools import partial
from hashlib import sha1 from hashlib import sha1
from time import time
from typing import Dict, List, Optional, Any, Set, Tuple, Union from typing import Dict, List, Optional, Any, Set, Tuple, Union
try: try:
@ -550,7 +551,10 @@ class Pipeline:
if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed
try: try:
logger.debug("Running node '%s` with input: %s", node_id, node_input) logger.debug("Running node '%s` with input: %s", node_id, node_input)
start = time()
node_output, stream_id = self._run_node(node_id, node_input) node_output, stream_id = self._run_node(node_id, node_input)
if "_debug" in node_output and node_id in node_output["_debug"]:
node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2)
except Exception as e: except Exception as e:
# The input might be a really large object with thousands of embeddings. # The input might be a really large object with thousands of embeddings.
# If you really want to see it, raise the log level. # If you really want to see it, raise the log level.

View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import inspect import inspect
import logging import logging
from time import time
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from pathlib import Path from pathlib import Path
import networkx as nx import networkx as nx
@ -355,7 +356,10 @@ class RayPipeline(Pipeline):
if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed
try: try:
logger.debug("Running node '%s` with input: %s", node_id, node_input) logger.debug("Running node '%s` with input: %s", node_id, node_input)
start = time()
node_output, stream_id = await self._run_node_async(node_id, node_input) node_output, stream_id = await self._run_node_async(node_id, node_input)
if "_debug" in node_output and node_id in node_output["_debug"]:
node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2)
except Exception as e: except Exception as e:
# The input might be a really large object with thousands of embeddings. # The input might be a really large object with thousands of embeddings.
# If you really want to see it, raise the log level. # If you really want to see it, raise the log level.

View File

@ -68,12 +68,16 @@ def test_debug_attributes_global(document_store_with_docs, tmp_path):
assert "Reader" in prediction["_debug"].keys() assert "Reader" in prediction["_debug"].keys()
assert "input" in prediction["_debug"]["BM25Retriever"].keys() assert "input" in prediction["_debug"]["BM25Retriever"].keys()
assert "output" in prediction["_debug"]["BM25Retriever"].keys() assert "output" in prediction["_debug"]["BM25Retriever"].keys()
assert "exec_time_ms" in prediction["_debug"]["BM25Retriever"].keys()
assert "input" in prediction["_debug"]["Reader"].keys() assert "input" in prediction["_debug"]["Reader"].keys()
assert "output" in prediction["_debug"]["Reader"].keys() assert "output" in prediction["_debug"]["Reader"].keys()
assert "exec_time_ms" in prediction["_debug"]["Reader"].keys()
assert prediction["_debug"]["BM25Retriever"]["input"] assert prediction["_debug"]["BM25Retriever"]["input"]
assert prediction["_debug"]["BM25Retriever"]["output"] assert prediction["_debug"]["BM25Retriever"]["output"]
assert prediction["_debug"]["BM25Retriever"]["exec_time_ms"] is not None
assert prediction["_debug"]["Reader"]["input"] assert prediction["_debug"]["Reader"]["input"]
assert prediction["_debug"]["Reader"]["output"] assert prediction["_debug"]["Reader"]["output"]
assert prediction["_debug"]["Reader"]["exec_time_ms"] is not None
# Avoid circular reference: easiest way to detect those is to use json.dumps # Avoid circular reference: easiest way to detect those is to use json.dumps
json.dumps(prediction, default=str) json.dumps(prediction, default=str)

View File

@ -83,6 +83,7 @@ async def test_load_advanced_pipeline_async(document_store_with_docs):
prediction = await pipeline.run_async( prediction = await pipeline.run_async(
query="Who lives in Berlin?", query="Who lives in Berlin?",
params={"ESRetriever1": {"top_k": 1}, "ESRetriever2": {"top_k": 2}, "Reader": {"top_k": 3}}, params={"ESRetriever1": {"top_k": 1}, "ESRetriever2": {"top_k": 2}, "Reader": {"top_k": 3}},
debug=True,
) )
assert pipeline._serve_controller_client._detached is True assert pipeline._serve_controller_client._detached is True
@ -96,3 +97,5 @@ async def test_load_advanced_pipeline_async(document_store_with_docs):
assert prediction["query"] == "Who lives in Berlin?" assert prediction["query"] == "Who lives in Berlin?"
assert prediction["answers"][0].answer == "Carla" assert prediction["answers"][0].answer == "Carla"
assert len(prediction["answers"]) > 1 assert len(prediction["answers"]) > 1
assert "exec_time_ms" in prediction["_debug"]["ESRetriever1"].keys()
assert prediction["_debug"]["ESRetriever1"]["exec_time_ms"]