diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 764f058b9..ae6fd497b 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -7,6 +7,7 @@ import itertools from datetime import timedelta from functools import partial from hashlib import sha1 +from time import time from typing import Dict, List, Optional, Any, Set, Tuple, Union try: @@ -550,7 +551,10 @@ class Pipeline: if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed try: logger.debug("Running node '%s` with input: %s", node_id, node_input) + start = time() node_output, stream_id = self._run_node(node_id, node_input) + if "_debug" in node_output and node_id in node_output["_debug"]: + node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2) except Exception as e: # The input might be a really large object with thousands of embeddings. # If you really want to see it, raise the log level. diff --git a/haystack/pipelines/ray.py b/haystack/pipelines/ray.py index 2d166a09f..ab8548199 100644 --- a/haystack/pipelines/ray.py +++ b/haystack/pipelines/ray.py @@ -1,6 +1,7 @@ from __future__ import annotations import inspect import logging +from time import time from typing import Any, Dict, List, Optional, Tuple, Union from pathlib import Path import networkx as nx @@ -355,7 +356,10 @@ class RayPipeline(Pipeline): if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed try: logger.debug("Running node '%s` with input: %s", node_id, node_input) + start = time() node_output, stream_id = await self._run_node_async(node_id, node_input) + if "_debug" in node_output and node_id in node_output["_debug"]: + node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2) except Exception as e: # The input might be a really large object with thousands of embeddings. # If you really want to see it, raise the log level. diff --git a/test/pipelines/test_pipeline_debug_and_validation.py b/test/pipelines/test_pipeline_debug_and_validation.py index e80d79e9a..2748445e1 100644 --- a/test/pipelines/test_pipeline_debug_and_validation.py +++ b/test/pipelines/test_pipeline_debug_and_validation.py @@ -68,12 +68,16 @@ def test_debug_attributes_global(document_store_with_docs, tmp_path): assert "Reader" in prediction["_debug"].keys() assert "input" in prediction["_debug"]["BM25Retriever"].keys() assert "output" in prediction["_debug"]["BM25Retriever"].keys() + assert "exec_time_ms" in prediction["_debug"]["BM25Retriever"].keys() assert "input" in prediction["_debug"]["Reader"].keys() assert "output" in prediction["_debug"]["Reader"].keys() + assert "exec_time_ms" in prediction["_debug"]["Reader"].keys() assert prediction["_debug"]["BM25Retriever"]["input"] assert prediction["_debug"]["BM25Retriever"]["output"] + assert prediction["_debug"]["BM25Retriever"]["exec_time_ms"] is not None assert prediction["_debug"]["Reader"]["input"] assert prediction["_debug"]["Reader"]["output"] + assert prediction["_debug"]["Reader"]["exec_time_ms"] is not None # Avoid circular reference: easiest way to detect those is to use json.dumps json.dumps(prediction, default=str) diff --git a/test/pipelines/test_ray.py b/test/pipelines/test_ray.py index 504a7c776..1c1449fd0 100644 --- a/test/pipelines/test_ray.py +++ b/test/pipelines/test_ray.py @@ -83,6 +83,7 @@ async def test_load_advanced_pipeline_async(document_store_with_docs): prediction = await pipeline.run_async( query="Who lives in Berlin?", params={"ESRetriever1": {"top_k": 1}, "ESRetriever2": {"top_k": 2}, "Reader": {"top_k": 3}}, + debug=True, ) assert pipeline._serve_controller_client._detached is True @@ -96,3 +97,5 @@ async def test_load_advanced_pipeline_async(document_store_with_docs): assert prediction["query"] == "Who lives in Berlin?" assert prediction["answers"][0].answer == "Carla" assert len(prediction["answers"]) > 1 + assert "exec_time_ms" in prediction["_debug"]["ESRetriever1"].keys() + assert prediction["_debug"]["ESRetriever1"]["exec_time_ms"]