From cf7f0ebc225d907d1751567c5810388193e9dc6d Mon Sep 17 00:00:00 2001 From: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com> Date: Tue, 26 Sep 2023 15:37:27 +0200 Subject: [PATCH] Add Pipelines async run (#5864) * Add Pipeline.arun() * Sleeper node * Fix async running * Add e2e tests To run a Pipeline that doesn't have any async node in async mode: pytest e2e/pipelines/test_standard_pipelines.py::test_query_and_indexing_pipeline To run a Pipeline that has a single async node in concurrent mode: pytest e2e/pipelines/test_standard_pipelines.py::test_async_concurrent_complex_pipeline To run a Pipeline that has a single async node in sequential mode: pytest e2e/pipelines/test_standard_pipelines.py::test_async_sequential_complex_pipeline * Remove unused _adispatch_run method * Make Pipeline.run work with async nodes * Revert "Make Pipeline.run work with async nodes" This reverts commit 22d7a94e4d41aca1b59dad18c0b366fbb6e8f431. * Rename Pipeline.arun to Pipeline._arun * Enhance docstring * Add Sleeper docstring * Add release notes * ignore typing across the node * make pylint happy * skip pylint on needed unused import * fix * if a node has an arun method, use it --------- Co-authored-by: Massimiliano Pippi --- e2e/pipelines/test_standard_pipelines.py | 67 +++++++- e2e/samples/pipelines/async_test_pipeline.yml | 60 +++++++ haystack/nodes/asyncio/sleeper.py | 74 +++++++++ haystack/nodes/base.py | 74 +++++++++ haystack/pipelines/base.py | 152 +++++++++++++++++- .../async-pipeline-20649b54ecff2706.yaml | 4 + 6 files changed, 425 insertions(+), 6 deletions(-) create mode 100644 e2e/samples/pipelines/async_test_pipeline.yml create mode 100644 haystack/nodes/asyncio/sleeper.py create mode 100644 releasenotes/notes/async-pipeline-20649b54ecff2706.yaml diff --git a/e2e/pipelines/test_standard_pipelines.py b/e2e/pipelines/test_standard_pipelines.py index 8e49ec941..f25ddcd13 100644 --- a/e2e/pipelines/test_standard_pipelines.py +++ b/e2e/pipelines/test_standard_pipelines.py @@ -1,8 +1,9 @@ import os +import asyncio import pytest -from haystack.document_stores import InMemoryDocumentStore +from haystack.document_stores import InMemoryDocumentStore, ElasticsearchDocumentStore from haystack.nodes.retriever.web import WebRetriever from haystack.pipelines import ( Pipeline, @@ -13,6 +14,7 @@ from haystack.pipelines import ( SearchSummarizationPipeline, ) from haystack.nodes import EmbeddingRetriever, PromptNode, BM25Retriever, TransformersSummarizer +from haystack.nodes.asyncio.sleeper import Sleeper # noqa # pylint: disable=unused-import from haystack.schema import Document @@ -89,7 +91,7 @@ def test_most_similar_documents_pipeline(): assert isinstance(document.content, str) -def test_most_similar_documents_pipeline_with_filters(): +async def test_most_similar_documents_pipeline_with_filters(): documents = [ {"id": "a", "content": "Sample text for document-1", "meta": {"source": "wiki1"}}, {"id": "b", "content": "Sample text for document-2", "meta": {"source": "wiki2"}}, @@ -120,17 +122,18 @@ def test_most_similar_documents_pipeline_with_filters(): assert document.meta["source"] in ["wiki3", "wiki4", "wiki5"] -def test_query_and_indexing_pipeline(samples_path): +@pytest.mark.asyncio +async def test_query_and_indexing_pipeline(samples_path): # test correct load of indexing pipeline from yaml pipeline = Pipeline.load_from_yaml( samples_path / "pipelines" / "test.haystack-pipeline.yml", pipeline_name="indexing_pipeline" ) - pipeline.run(file_paths=samples_path / "pipelines" / "sample_pdf_1.pdf") + await pipeline._arun(file_paths=samples_path / "pipelines" / "sample_pdf_1.pdf") # test correct load of query pipeline from yaml pipeline = Pipeline.load_from_yaml( samples_path / "pipelines" / "test.haystack-pipeline.yml", pipeline_name="query_pipeline" ) - prediction = pipeline.run( + prediction = await pipeline._arun( query="Who made the PDF specification?", params={"Retriever": {"top_k": 2}, "Reader": {"top_k": 1}} ) assert prediction["query"] == "Who made the PDF specification?" @@ -138,6 +141,60 @@ def test_query_and_indexing_pipeline(samples_path): assert "_debug" not in prediction.keys() +@pytest.mark.asyncio +async def test_async_concurrent_complex_pipeline(samples_path): + documents = [ + {"content": "How to test module-1?", "meta": {"source": "wiki1", "answer": "Using tests for module-1"}}, + {"content": "How to test module-2?", "meta": {"source": "wiki2", "answer": "Using tests for module-2"}}, + {"content": "How to test module-3?", "meta": {"source": "wiki3", "answer": "Using tests for module-3"}}, + {"content": "How to test module-4?", "meta": {"source": "wiki4", "answer": "Using tests for module-4"}}, + {"content": "How to test module-5?", "meta": {"source": "wiki5", "answer": "Using tests for module-5"}}, + ] + document_store = ElasticsearchDocumentStore() + document_store.write_documents(documents) + + # test correct load of indexing pipeline from yaml + pipeline = Pipeline.load_from_yaml(samples_path / "pipelines" / "async_test_pipeline.yml", pipeline_name="query") + queries = [ + "How to test module-1?", + "How to test module-2?", + "How to test module-3?", + "How to test module-4?", + "How to test module-5?", + ] + futures = [] + for query in queries: + future = pipeline._arun(query=query) + futures.append(future) + + await asyncio.gather(*futures) + + +@pytest.mark.asyncio +async def test_async_sequential_complex_pipeline(samples_path): + documents = [ + {"content": "How to test module-1?", "meta": {"source": "wiki1", "answer": "Using tests for module-1"}}, + {"content": "How to test module-2?", "meta": {"source": "wiki2", "answer": "Using tests for module-2"}}, + {"content": "How to test module-3?", "meta": {"source": "wiki3", "answer": "Using tests for module-3"}}, + {"content": "How to test module-4?", "meta": {"source": "wiki4", "answer": "Using tests for module-4"}}, + {"content": "How to test module-5?", "meta": {"source": "wiki5", "answer": "Using tests for module-5"}}, + ] + document_store = ElasticsearchDocumentStore() + document_store.write_documents(documents) + + # test correct load of indexing pipeline from yaml + pipeline = Pipeline.load_from_yaml(samples_path / "pipelines" / "async_test_pipeline.yml", pipeline_name="query") + queries = [ + "How to test module-1?", + "How to test module-2?", + "How to test module-3?", + "How to test module-4?", + "How to test module-5?", + ] + for query in queries: + await pipeline._arun(query=query) + + @pytest.mark.skipif( not os.environ.get("OPENAI_API_KEY", None), reason="Please export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", diff --git a/e2e/samples/pipelines/async_test_pipeline.yml b/e2e/samples/pipelines/async_test_pipeline.yml new file mode 100644 index 000000000..376373008 --- /dev/null +++ b/e2e/samples/pipelines/async_test_pipeline.yml @@ -0,0 +1,60 @@ +# This pipeline has been automatically migrated to conform to Haystack 1.15.1. +# Please check that the pipeline is still working as expected. +# In this version: split by paragraphs, new prompt (simplified, opinions rather than facts) + +version: "1.15.1" + +components: + - name: DocumentStore + type: ElasticsearchDocumentStore + - name: BM25Retriever + type: BM25Retriever + params: + document_store: DocumentStore + top_k: 30 + - name: JoinDocuments + type: JoinDocuments + params: + top_k_join: 30 + join_mode: reciprocal_rank_fusion + - name: Reranker + type: SentenceTransformersRanker + params: + model_name_or_path: svalabs/cross-electra-ms-marco-german-uncased + top_k: 8 + - name: QueryClassifier + type: TransformersQueryClassifier + params: + model_name_or_path: deepset/deberta-v3-base-injection + labels: ["LEGIT", "INJECTION"] + - name: qa_template + params: + output_parser: + type: AnswerParser + prompt: "Given this context answer the question:{new_line}\ + '''{join(documents, delimiter=new_line, pattern='Rede[$idx], Datum: $date, Dies ist die Rede von: $politician_name (Partei: $faction): <$content>')}'''{new_line}\ + Question: {query}{new_line}\ + Answer: {new_line}" + type: PromptTemplate + - name: PromptNode + type: Sleeper + - name: FileTypeClassifier + type: FileTypeClassifier + - name: TextConverter + type: TextConverter + - name: PDFConverter + type: PDFToTextConverter + +pipelines: + - name: query + nodes: + - name: QueryClassifier + inputs: [Query] + - name: BM25Retriever + inputs: [QueryClassifier.output_1] + - name: JoinDocuments + inputs: [BM25Retriever] + - name: Reranker + inputs: [JoinDocuments] + - name: PromptNode + inputs: [Reranker] diff --git a/haystack/nodes/asyncio/sleeper.py b/haystack/nodes/asyncio/sleeper.py new file mode 100644 index 000000000..a6406cd8b --- /dev/null +++ b/haystack/nodes/asyncio/sleeper.py @@ -0,0 +1,74 @@ +from typing import Optional, List, Dict, Union, Any, Literal +import asyncio + +import numpy as np + +from haystack.schema import Document, MultiLabel, Answer +from haystack.nodes.base import BaseComponent + + +class Sleeper(BaseComponent): + """ + Simple component that sleeps for a random amount of time and then returns a dummy answer. + """ + + outgoing_edges: int = 1 + + def __init__( + self, + mean_sleep_in_seconds: float = 10, + sleep_scale: float = 1.0, + answer_type: Literal["generative", "extractive", "other"] = "generative", + answer_score: Optional[float] = None, + answer: str = "Placeholder", + ) -> None: + super().__init__() + self._mean_sleep_in_seconds = mean_sleep_in_seconds + self._sleep_scale = sleep_scale + self._answer_type = answer_type + self._answer = answer + self._answer_score = answer_score + + # pylint: disable=invalid-overridden-method + async def run( # type: ignore + self, + query: Optional[str] = None, + file_paths: Optional[List[str]] = None, + labels: Optional[MultiLabel] = None, + documents: Optional[List[Document]] = None, + meta: Optional[dict] = None, + ): + if query is None: + return {"answers": []}, "output_1" + + meta_data = meta if meta is not None else {} + + # Sleep to mock processing time + sleep_time_seconds = max(0.0, np.random.normal(self._mean_sleep_in_seconds, self._sleep_scale)) + await asyncio.sleep(sleep_time_seconds) + + return { + "answers": [Answer(answer=self._answer, type=self._answer_type, meta=meta_data, score=self._answer_score)] + }, "output_1" + + # pylint: disable=too-many-arguments + def run_batch( # type: ignore + self, + queries: Optional[Union[str, List[str]]] = None, + file_paths: Optional[List[str]] = None, + labels: Optional[Union[MultiLabel, List[MultiLabel]]] = None, + documents: Optional[Union[List[Document], List[List[Document]]]] = None, + meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, + params: Optional[dict] = None, + debug: Optional[bool] = None, + ): + queries = queries or [] + query_list: List[str] = [queries] if isinstance(queries, str) else queries + result: Dict[Any, Any] = {"answers": [], "queries": []} + + for query in query_list: + iteration_result, _ = self.run(query=query) # type: ignore + result["answers"].append(iteration_result["answers"]) # type: ignore + result["queries"].append(query) + + return result, "output_1" diff --git a/haystack/nodes/base.py b/haystack/nodes/base.py index a471b8c3f..ca212cc92 100644 --- a/haystack/nodes/base.py +++ b/haystack/nodes/base.py @@ -273,6 +273,80 @@ class BaseComponent(ABC): output["params"] = params return output, stream + async def _adispatch_run_general(self, run_method: Callable, **kwargs): + """ + This is the async version of _dispatch_run_general and is used indirectly by Pipeline._arun(). + When actually running the node it tries to run it asynchronously, if that fails fall back to a synchronous run. + This makes it possible to run a pipeline asynchronously with a mix of async and sync nodes. + + This method takes care of the following: + - inspect run_method's signature to validate if all necessary arguments are available + - pop `debug` and sets them on the instance to control debug output + - call run_method with the corresponding arguments and gather output + - collate `_debug` information if present + - merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline + + """ + arguments = deepcopy(kwargs) + params = arguments.get("params") or {} + + run_signature_args = inspect.signature(run_method).parameters.keys() + + run_params: Dict[str, Any] = {} + for key, value in params.items(): + if key == self.name: # targeted params for this node + if isinstance(value, dict): + # Extract debug attributes + if "debug" in value.keys(): + self.debug = value.pop("debug") + + for key in value.keys(): + if key not in run_signature_args: + raise Exception(f"Invalid parameter '{key}' for the node '{self.name}'.") + + run_params.update(**value) + elif key in run_signature_args: # global params + run_params[key] = value + + run_inputs = {} + for key, value in arguments.items(): + if key in run_signature_args: + run_inputs[key] = value + + try: + output, stream = await run_method(**run_inputs, **run_params) + except TypeError: + output, stream = run_method(**run_inputs, **run_params) + + # Collect debug information + debug_info = {} + if getattr(self, "debug", None): + # Include input + debug_info["input"] = {**run_inputs, **run_params} + debug_info["input"]["debug"] = self.debug + # Include output, exclude _debug to avoid recursion + filtered_output = {key: value for key, value in output.items() if key != "_debug"} + debug_info["output"] = filtered_output + # Include custom debug info + custom_debug = output.get("_debug", {}) + if custom_debug: + debug_info["runtime"] = custom_debug + + # append _debug information from nodes + all_debug = arguments.get("_debug", {}) + if debug_info: + all_debug[self.name] = debug_info + if all_debug: + output["_debug"] = all_debug + + # add "extra" args that were not used by the node, but not the 'inputs' value + for k, v in arguments.items(): + if k not in output.keys() and k != "inputs": + output[k] = v + + output["params"] = params + return output, stream + class RootNode(BaseComponent): """ diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index ed329c75a..982c6b7d2 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -1,4 +1,4 @@ -# pylint: disable=too-many-public-methods +# pylint: disable=too-many-public-methods,too-many-lines from __future__ import annotations @@ -468,6 +468,18 @@ class Pipeline: def _run_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]: return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) + async def _arun_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]: + node = self.graph.nodes[node_id]["component"] + # If a component has a `arun` method, prefer that one over `run` + if hasattr(node, "arun") and callable(node.arun): + run_method = node.arun + else: + # note this might or might not be async, but `_adispatch_run_general` + # will work in both cases. This is a safe fall-back. + run_method = node.run + + return await node._adispatch_run_general(run_method, **node_input) + def run( # type: ignore self, query: Optional[str] = None, @@ -606,6 +618,144 @@ class Pipeline: return node_output + async def _arun( # noqa: C901,PLR0912 type: ignore + self, + query: Optional[str] = None, + file_paths: Optional[List[str]] = None, + labels: Optional[MultiLabel] = None, + documents: Optional[List[Document]] = None, + meta: Optional[Union[dict, List[dict]]] = None, + params: Optional[dict] = None, + debug: Optional[bool] = None, + ): + """ + Runs the Pipeline, one node at a time. + + :param query: The search query (for query pipelines only). + :param file_paths: The files to index (for indexing pipelines only). + :param labels: Ground-truth labels that you can use to perform an isolated evaluation of pipelines. These labels are input to nodes in the pipeline. + :param documents: A list of Document objects to be processed by the Pipeline Nodes. + :param meta: Files' metadata. Used in indexing pipelines in combination with `file_paths`. + :param params: A dictionary of parameters that you want to pass to the nodes. + To pass a parameter to all Nodes, use: `{"top_k": 10}`. + To pass a parameter to targeted Nodes, run: + `{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}}` + :param debug: Specifies whether the Pipeline should instruct Nodes to collect debug information + about their execution. By default, this information includes the input parameters + the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`. + """ + self.runs += 1 + + send_pipeline_event( + pipeline=self, + query=query, + file_paths=file_paths, + labels=labels, + documents=documents, + meta=meta, + params=params, + debug=debug, + ) + + # validate the node names + self._validate_node_names_in_params(params=params) + + root_node = self.root_node + if not root_node: + raise PipelineError("Cannot run a pipeline with no nodes.") + + node_output = None + queue: Dict[str, Any] = { + root_node: {"root_node": root_node, "params": params} + } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue + if query is not None: + queue[root_node]["query"] = query + if file_paths: + queue[root_node]["file_paths"] = file_paths + if labels: + queue[root_node]["labels"] = labels + if documents: + queue[root_node]["documents"] = documents + if meta: + queue[root_node]["meta"] = meta + + i = 0 # the first item is popped off the queue unless it is a "join" node with unprocessed predecessors + while queue: + node_id = list(queue.keys())[i] + node_input = queue[node_id] + node_input["node_id"] = node_id + + # Apply debug attributes to the node input params + # NOTE: global debug attributes will override the value specified + # in each node's params dictionary. + if debug is None and node_input and node_input.get("params", {}): + debug = params.get("debug", None) # type: ignore + if debug is not None: + if not node_input.get("params", None): + node_input["params"] = {} + if node_id not in node_input["params"].keys(): + node_input["params"][node_id] = {} + node_input["params"][node_id]["debug"] = debug + + predecessors = set(nx.ancestors(self.graph, node_id)) + if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed + try: + logger.debug("Running node '%s` with input: %s", node_id, node_input) + start = time() + node_output, stream_id = await self._arun_node(node_id, node_input) + if "_debug" in node_output and node_id in node_output["_debug"]: + node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2) + except Exception as e: + # The input might be a really large object with thousands of embeddings. + # If you really want to see it, raise the log level. + logger.debug("Exception while running node '%s' with input %s", node_id, node_input) + raise Exception( + f"Exception while running node '{node_id}': {e}\nEnable debug logging to see the data that was passed when the pipeline failed." + ) from e + queue.pop(node_id) + # + if stream_id == "split": + for stream_id in [key for key in node_output.keys() if key.startswith("output_")]: + current_node_output = {k: v for k, v in node_output.items() if not k.startswith("output_")} + current_docs = node_output.pop(stream_id) + current_node_output["documents"] = current_docs + next_nodes = self.get_next_nodes(node_id, stream_id) + for n in next_nodes: + queue[n] = current_node_output + else: + next_nodes = self.get_next_nodes(node_id, stream_id) + for n in next_nodes: # add successor nodes with corresponding inputs to the queue + if queue.get(n): # concatenate inputs if it's a join node + existing_input = queue[n] + if "inputs" not in existing_input.keys(): + updated_input: dict = {"inputs": [existing_input, node_output], "params": params} + if "_debug" in existing_input.keys() or "_debug" in node_output.keys(): + updated_input["_debug"] = { + **existing_input.get("_debug", {}), + **node_output.get("_debug", {}), + } + if query: + updated_input["query"] = query + if file_paths: + updated_input["file_paths"] = file_paths + if labels: + updated_input["labels"] = labels + if documents: + updated_input["documents"] = documents + if meta: + updated_input["meta"] = meta + else: + existing_input["inputs"].append(node_output) + updated_input = existing_input + queue[n] = updated_input + else: + queue[n] = node_output + i = 0 + else: + i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors + + return node_output + def run_batch( # noqa: C901,PLR0912 type: ignore self, queries: Optional[List[str]] = None, diff --git a/releasenotes/notes/async-pipeline-20649b54ecff2706.yaml b/releasenotes/notes/async-pipeline-20649b54ecff2706.yaml new file mode 100644 index 000000000..58d689fff --- /dev/null +++ b/releasenotes/notes/async-pipeline-20649b54ecff2706.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add experimental support for asynchronous `Pipeline` run