Add Pipelines async run (#5864)

* Add Pipeline.arun()

* Sleeper node

* Fix async running

* Add e2e tests

To run a Pipeline that doesn't have any async node in async mode:

    pytest e2e/pipelines/test_standard_pipelines.py::test_query_and_indexing_pipeline

To run a Pipeline that has a single async node in concurrent mode:

    pytest e2e/pipelines/test_standard_pipelines.py::test_async_concurrent_complex_pipeline

To run a Pipeline that has a single async node in sequential mode:

    pytest e2e/pipelines/test_standard_pipelines.py::test_async_sequential_complex_pipeline

* Remove unused _adispatch_run method

* Make Pipeline.run work with async nodes

* Revert "Make Pipeline.run work with async nodes"

This reverts commit 22d7a94e4d41aca1b59dad18c0b366fbb6e8f431.

* Rename Pipeline.arun to Pipeline._arun

* Enhance docstring

* Add Sleeper docstring

* Add release notes

* ignore typing across the node

* make pylint happy

* skip pylint on needed unused import

* fix

* if a node has an arun method, use it

---------

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
This commit is contained in:
Silvano Cerza 2023-09-26 15:37:27 +02:00 committed by GitHub
parent 8d26057566
commit cf7f0ebc22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 425 additions and 6 deletions

View File

@ -1,8 +1,9 @@
import os
import asyncio
import pytest
from haystack.document_stores import InMemoryDocumentStore
from haystack.document_stores import InMemoryDocumentStore, ElasticsearchDocumentStore
from haystack.nodes.retriever.web import WebRetriever
from haystack.pipelines import (
Pipeline,
@ -13,6 +14,7 @@ from haystack.pipelines import (
SearchSummarizationPipeline,
)
from haystack.nodes import EmbeddingRetriever, PromptNode, BM25Retriever, TransformersSummarizer
from haystack.nodes.asyncio.sleeper import Sleeper # noqa # pylint: disable=unused-import
from haystack.schema import Document
@ -89,7 +91,7 @@ def test_most_similar_documents_pipeline():
assert isinstance(document.content, str)
def test_most_similar_documents_pipeline_with_filters():
async def test_most_similar_documents_pipeline_with_filters():
documents = [
{"id": "a", "content": "Sample text for document-1", "meta": {"source": "wiki1"}},
{"id": "b", "content": "Sample text for document-2", "meta": {"source": "wiki2"}},
@ -120,17 +122,18 @@ def test_most_similar_documents_pipeline_with_filters():
assert document.meta["source"] in ["wiki3", "wiki4", "wiki5"]
def test_query_and_indexing_pipeline(samples_path):
@pytest.mark.asyncio
async def test_query_and_indexing_pipeline(samples_path):
# test correct load of indexing pipeline from yaml
pipeline = Pipeline.load_from_yaml(
samples_path / "pipelines" / "test.haystack-pipeline.yml", pipeline_name="indexing_pipeline"
)
pipeline.run(file_paths=samples_path / "pipelines" / "sample_pdf_1.pdf")
await pipeline._arun(file_paths=samples_path / "pipelines" / "sample_pdf_1.pdf")
# test correct load of query pipeline from yaml
pipeline = Pipeline.load_from_yaml(
samples_path / "pipelines" / "test.haystack-pipeline.yml", pipeline_name="query_pipeline"
)
prediction = pipeline.run(
prediction = await pipeline._arun(
query="Who made the PDF specification?", params={"Retriever": {"top_k": 2}, "Reader": {"top_k": 1}}
)
assert prediction["query"] == "Who made the PDF specification?"
@ -138,6 +141,60 @@ def test_query_and_indexing_pipeline(samples_path):
assert "_debug" not in prediction.keys()
@pytest.mark.asyncio
async def test_async_concurrent_complex_pipeline(samples_path):
documents = [
{"content": "How to test module-1?", "meta": {"source": "wiki1", "answer": "Using tests for module-1"}},
{"content": "How to test module-2?", "meta": {"source": "wiki2", "answer": "Using tests for module-2"}},
{"content": "How to test module-3?", "meta": {"source": "wiki3", "answer": "Using tests for module-3"}},
{"content": "How to test module-4?", "meta": {"source": "wiki4", "answer": "Using tests for module-4"}},
{"content": "How to test module-5?", "meta": {"source": "wiki5", "answer": "Using tests for module-5"}},
]
document_store = ElasticsearchDocumentStore()
document_store.write_documents(documents)
# test correct load of indexing pipeline from yaml
pipeline = Pipeline.load_from_yaml(samples_path / "pipelines" / "async_test_pipeline.yml", pipeline_name="query")
queries = [
"How to test module-1?",
"How to test module-2?",
"How to test module-3?",
"How to test module-4?",
"How to test module-5?",
]
futures = []
for query in queries:
future = pipeline._arun(query=query)
futures.append(future)
await asyncio.gather(*futures)
@pytest.mark.asyncio
async def test_async_sequential_complex_pipeline(samples_path):
documents = [
{"content": "How to test module-1?", "meta": {"source": "wiki1", "answer": "Using tests for module-1"}},
{"content": "How to test module-2?", "meta": {"source": "wiki2", "answer": "Using tests for module-2"}},
{"content": "How to test module-3?", "meta": {"source": "wiki3", "answer": "Using tests for module-3"}},
{"content": "How to test module-4?", "meta": {"source": "wiki4", "answer": "Using tests for module-4"}},
{"content": "How to test module-5?", "meta": {"source": "wiki5", "answer": "Using tests for module-5"}},
]
document_store = ElasticsearchDocumentStore()
document_store.write_documents(documents)
# test correct load of indexing pipeline from yaml
pipeline = Pipeline.load_from_yaml(samples_path / "pipelines" / "async_test_pipeline.yml", pipeline_name="query")
queries = [
"How to test module-1?",
"How to test module-2?",
"How to test module-3?",
"How to test module-4?",
"How to test module-5?",
]
for query in queries:
await pipeline._arun(query=query)
@pytest.mark.skipif(
not os.environ.get("OPENAI_API_KEY", None),
reason="Please export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.",

View File

@ -0,0 +1,60 @@
# This pipeline has been automatically migrated to conform to Haystack 1.15.1.
# Please check that the pipeline is still working as expected.
# In this version: split by paragraphs, new prompt (simplified, opinions rather than facts)
version: "1.15.1"
components:
- name: DocumentStore
type: ElasticsearchDocumentStore
- name: BM25Retriever
type: BM25Retriever
params:
document_store: DocumentStore
top_k: 30
- name: JoinDocuments
type: JoinDocuments
params:
top_k_join: 30
join_mode: reciprocal_rank_fusion
- name: Reranker
type: SentenceTransformersRanker
params:
model_name_or_path: svalabs/cross-electra-ms-marco-german-uncased
top_k: 8
- name: QueryClassifier
type: TransformersQueryClassifier
params:
model_name_or_path: deepset/deberta-v3-base-injection
labels: ["LEGIT", "INJECTION"]
- name: qa_template
params:
output_parser:
type: AnswerParser
prompt: "Given this context answer the question:{new_line}\
'''{join(documents, delimiter=new_line, pattern='Rede[$idx], Datum: $date, Dies ist die Rede von: $politician_name (Partei: $faction): <$content>')}'''{new_line}\
Question: {query}{new_line}\
Answer: {new_line}"
type: PromptTemplate
- name: PromptNode
type: Sleeper
- name: FileTypeClassifier
type: FileTypeClassifier
- name: TextConverter
type: TextConverter
- name: PDFConverter
type: PDFToTextConverter
pipelines:
- name: query
nodes:
- name: QueryClassifier
inputs: [Query]
- name: BM25Retriever
inputs: [QueryClassifier.output_1]
- name: JoinDocuments
inputs: [BM25Retriever]
- name: Reranker
inputs: [JoinDocuments]
- name: PromptNode
inputs: [Reranker]

View File

@ -0,0 +1,74 @@
from typing import Optional, List, Dict, Union, Any, Literal
import asyncio
import numpy as np
from haystack.schema import Document, MultiLabel, Answer
from haystack.nodes.base import BaseComponent
class Sleeper(BaseComponent):
"""
Simple component that sleeps for a random amount of time and then returns a dummy answer.
"""
outgoing_edges: int = 1
def __init__(
self,
mean_sleep_in_seconds: float = 10,
sleep_scale: float = 1.0,
answer_type: Literal["generative", "extractive", "other"] = "generative",
answer_score: Optional[float] = None,
answer: str = "Placeholder",
) -> None:
super().__init__()
self._mean_sleep_in_seconds = mean_sleep_in_seconds
self._sleep_scale = sleep_scale
self._answer_type = answer_type
self._answer = answer
self._answer_score = answer_score
# pylint: disable=invalid-overridden-method
async def run( # type: ignore
self,
query: Optional[str] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[MultiLabel] = None,
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
):
if query is None:
return {"answers": []}, "output_1"
meta_data = meta if meta is not None else {}
# Sleep to mock processing time
sleep_time_seconds = max(0.0, np.random.normal(self._mean_sleep_in_seconds, self._sleep_scale))
await asyncio.sleep(sleep_time_seconds)
return {
"answers": [Answer(answer=self._answer, type=self._answer_type, meta=meta_data, score=self._answer_score)]
}, "output_1"
# pylint: disable=too-many-arguments
def run_batch( # type: ignore
self,
queries: Optional[Union[str, List[str]]] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[Union[MultiLabel, List[MultiLabel]]] = None,
documents: Optional[Union[List[Document], List[List[Document]]]] = None,
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
params: Optional[dict] = None,
debug: Optional[bool] = None,
):
queries = queries or []
query_list: List[str] = [queries] if isinstance(queries, str) else queries
result: Dict[Any, Any] = {"answers": [], "queries": []}
for query in query_list:
iteration_result, _ = self.run(query=query) # type: ignore
result["answers"].append(iteration_result["answers"]) # type: ignore
result["queries"].append(query)
return result, "output_1"

View File

@ -273,6 +273,80 @@ class BaseComponent(ABC):
output["params"] = params
return output, stream
async def _adispatch_run_general(self, run_method: Callable, **kwargs):
"""
This is the async version of _dispatch_run_general and is used indirectly by Pipeline._arun().
When actually running the node it tries to run it asynchronously, if that fails fall back to a synchronous run.
This makes it possible to run a pipeline asynchronously with a mix of async and sync nodes.
This method takes care of the following:
- inspect run_method's signature to validate if all necessary arguments are available
- pop `debug` and sets them on the instance to control debug output
- call run_method with the corresponding arguments and gather output
- collate `_debug` information if present
- merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline
"""
arguments = deepcopy(kwargs)
params = arguments.get("params") or {}
run_signature_args = inspect.signature(run_method).parameters.keys()
run_params: Dict[str, Any] = {}
for key, value in params.items():
if key == self.name: # targeted params for this node
if isinstance(value, dict):
# Extract debug attributes
if "debug" in value.keys():
self.debug = value.pop("debug")
for key in value.keys():
if key not in run_signature_args:
raise Exception(f"Invalid parameter '{key}' for the node '{self.name}'.")
run_params.update(**value)
elif key in run_signature_args: # global params
run_params[key] = value
run_inputs = {}
for key, value in arguments.items():
if key in run_signature_args:
run_inputs[key] = value
try:
output, stream = await run_method(**run_inputs, **run_params)
except TypeError:
output, stream = run_method(**run_inputs, **run_params)
# Collect debug information
debug_info = {}
if getattr(self, "debug", None):
# Include input
debug_info["input"] = {**run_inputs, **run_params}
debug_info["input"]["debug"] = self.debug
# Include output, exclude _debug to avoid recursion
filtered_output = {key: value for key, value in output.items() if key != "_debug"}
debug_info["output"] = filtered_output
# Include custom debug info
custom_debug = output.get("_debug", {})
if custom_debug:
debug_info["runtime"] = custom_debug
# append _debug information from nodes
all_debug = arguments.get("_debug", {})
if debug_info:
all_debug[self.name] = debug_info
if all_debug:
output["_debug"] = all_debug
# add "extra" args that were not used by the node, but not the 'inputs' value
for k, v in arguments.items():
if k not in output.keys() and k != "inputs":
output[k] = v
output["params"] = params
return output, stream
class RootNode(BaseComponent):
"""

View File

@ -1,4 +1,4 @@
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-public-methods,too-many-lines
from __future__ import annotations
@ -468,6 +468,18 @@ class Pipeline:
def _run_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]:
return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
async def _arun_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]:
node = self.graph.nodes[node_id]["component"]
# If a component has a `arun` method, prefer that one over `run`
if hasattr(node, "arun") and callable(node.arun):
run_method = node.arun
else:
# note this might or might not be async, but `_adispatch_run_general`
# will work in both cases. This is a safe fall-back.
run_method = node.run
return await node._adispatch_run_general(run_method, **node_input)
def run( # type: ignore
self,
query: Optional[str] = None,
@ -606,6 +618,144 @@ class Pipeline:
return node_output
async def _arun( # noqa: C901,PLR0912 type: ignore
self,
query: Optional[str] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[MultiLabel] = None,
documents: Optional[List[Document]] = None,
meta: Optional[Union[dict, List[dict]]] = None,
params: Optional[dict] = None,
debug: Optional[bool] = None,
):
"""
Runs the Pipeline, one node at a time.
:param query: The search query (for query pipelines only).
:param file_paths: The files to index (for indexing pipelines only).
:param labels: Ground-truth labels that you can use to perform an isolated evaluation of pipelines. These labels are input to nodes in the pipeline.
:param documents: A list of Document objects to be processed by the Pipeline Nodes.
:param meta: Files' metadata. Used in indexing pipelines in combination with `file_paths`.
:param params: A dictionary of parameters that you want to pass to the nodes.
To pass a parameter to all Nodes, use: `{"top_k": 10}`.
To pass a parameter to targeted Nodes, run:
`{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}}`
:param debug: Specifies whether the Pipeline should instruct Nodes to collect debug information
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
self.runs += 1
send_pipeline_event(
pipeline=self,
query=query,
file_paths=file_paths,
labels=labels,
documents=documents,
meta=meta,
params=params,
debug=debug,
)
# validate the node names
self._validate_node_names_in_params(params=params)
root_node = self.root_node
if not root_node:
raise PipelineError("Cannot run a pipeline with no nodes.")
node_output = None
queue: Dict[str, Any] = {
root_node: {"root_node": root_node, "params": params}
} # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue
if query is not None:
queue[root_node]["query"] = query
if file_paths:
queue[root_node]["file_paths"] = file_paths
if labels:
queue[root_node]["labels"] = labels
if documents:
queue[root_node]["documents"] = documents
if meta:
queue[root_node]["meta"] = meta
i = 0 # the first item is popped off the queue unless it is a "join" node with unprocessed predecessors
while queue:
node_id = list(queue.keys())[i]
node_input = queue[node_id]
node_input["node_id"] = node_id
# Apply debug attributes to the node input params
# NOTE: global debug attributes will override the value specified
# in each node's params dictionary.
if debug is None and node_input and node_input.get("params", {}):
debug = params.get("debug", None) # type: ignore
if debug is not None:
if not node_input.get("params", None):
node_input["params"] = {}
if node_id not in node_input["params"].keys():
node_input["params"][node_id] = {}
node_input["params"][node_id]["debug"] = debug
predecessors = set(nx.ancestors(self.graph, node_id))
if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed
try:
logger.debug("Running node '%s` with input: %s", node_id, node_input)
start = time()
node_output, stream_id = await self._arun_node(node_id, node_input)
if "_debug" in node_output and node_id in node_output["_debug"]:
node_output["_debug"][node_id]["exec_time_ms"] = round((time() - start) * 1000, 2)
except Exception as e:
# The input might be a really large object with thousands of embeddings.
# If you really want to see it, raise the log level.
logger.debug("Exception while running node '%s' with input %s", node_id, node_input)
raise Exception(
f"Exception while running node '{node_id}': {e}\nEnable debug logging to see the data that was passed when the pipeline failed."
) from e
queue.pop(node_id)
#
if stream_id == "split":
for stream_id in [key for key in node_output.keys() if key.startswith("output_")]:
current_node_output = {k: v for k, v in node_output.items() if not k.startswith("output_")}
current_docs = node_output.pop(stream_id)
current_node_output["documents"] = current_docs
next_nodes = self.get_next_nodes(node_id, stream_id)
for n in next_nodes:
queue[n] = current_node_output
else:
next_nodes = self.get_next_nodes(node_id, stream_id)
for n in next_nodes: # add successor nodes with corresponding inputs to the queue
if queue.get(n): # concatenate inputs if it's a join node
existing_input = queue[n]
if "inputs" not in existing_input.keys():
updated_input: dict = {"inputs": [existing_input, node_output], "params": params}
if "_debug" in existing_input.keys() or "_debug" in node_output.keys():
updated_input["_debug"] = {
**existing_input.get("_debug", {}),
**node_output.get("_debug", {}),
}
if query:
updated_input["query"] = query
if file_paths:
updated_input["file_paths"] = file_paths
if labels:
updated_input["labels"] = labels
if documents:
updated_input["documents"] = documents
if meta:
updated_input["meta"] = meta
else:
existing_input["inputs"].append(node_output)
updated_input = existing_input
queue[n] = updated_input
else:
queue[n] = node_output
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
return node_output
def run_batch( # noqa: C901,PLR0912 type: ignore
self,
queries: Optional[List[str]] = None,

View File

@ -0,0 +1,4 @@
---
features:
- |
Add experimental support for asynchronous `Pipeline` run