Refactor replicas config for Ray Pipelines (#1378)

This commit is contained in:
oryx1729 2021-08-31 10:14:55 +02:00 committed by GitHub
parent da5ed43734
commit a71180a2ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 10 deletions

View File

@ -1009,19 +1009,33 @@ class RayPipeline(Pipeline):
""" """
Ray (https://ray.io) is a framework for distributed computing. Ray (https://ray.io) is a framework for distributed computing.
With Ray, the Pipeline nodes can be distributed across a cluster of machine(s). Ray allows distributing a Pipeline's components across a cluster of machines. The individual components of a
Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas
of the Reader and a single replica for the Retriever. It enables efficient resource utilization by horizontally
scaling Components.
This allows scaling individual nodes. For instance, in an extractive QA Pipeline, multiple replicas To set the number of replicas, add `replicas` in the YAML config for the node in a pipeline:
of the Reader, while keeping a single instance for the Retriever. It also enables efficient resource
utilization as load could be split across GPU vs CPU machines.
In the current implementation, a Ray Pipeline can only be created with a YAML Pipeline config. ```yaml
| components:
| ...
|
| pipelines:
| - name: ray_query_pipeline
| type: RayPipeline
| nodes:
| - name: ESRetriever
| replicas: 2 # number of replicas to create on the Ray cluster
| inputs: [ Query ]
```
A RayPipeline can only be created with a YAML Pipeline config.
>>> from haystack.pipeline import RayPipeline >>> from haystack.pipeline import RayPipeline
>>> pipeline = RayPipeline.load_from_yaml(path="my_pipelines.yaml", pipeline_name="my_query_pipeline") >>> pipeline = RayPipeline.load_from_yaml(path="my_pipelines.yaml", pipeline_name="my_query_pipeline")
>>> pipeline.run(query="What is the capital of Germany?") >>> pipeline.run(query="What is the capital of Germany?")
By default, RayPipelines creates an instance of RayServe locally. To connect to an existing Ray instance, By default, RayPipelines creates an instance of RayServe locally. To connect to an existing Ray instance,
set the `address` parameter when creating RayPipeline instance. set the `address` parameter when creating the RayPipeline instance.
""" """
def __init__(self, address: str = None, **kwargs): def __init__(self, address: str = None, **kwargs):
""" """
@ -1101,7 +1115,7 @@ class RayPipeline(Pipeline):
name = node_config["name"] name = node_config["name"]
component_type = definitions[name]["type"] component_type = definitions[name]["type"]
component_class = BaseComponent.get_subclass(component_type) component_class = BaseComponent.get_subclass(component_type)
replicas = next(comp for comp in data["components"] if comp["name"] == name).get("replicas", 1) replicas = next(node for node in pipeline_config["nodes"] if node["name"] == name).get("replicas", 1)
handle = cls._create_ray_deployment(component_name=name, pipeline_config=data, replicas=replicas) handle = cls._create_ray_deployment(component_name=name, pipeline_config=data, replicas=replicas)
pipeline._add_ray_deployment_in_graph( pipeline._add_ray_deployment_in_graph(
handle=handle, handle=handle,

View File

@ -28,15 +28,24 @@ components:
pipelines: pipelines:
- name: query_pipeline - name: query_pipeline
type: Query type: Pipeline
nodes: nodes:
- name: ESRetriever - name: ESRetriever
inputs: [Query] inputs: [Query]
- name: Reader - name: Reader
inputs: [ESRetriever] inputs: [ESRetriever]
- name: ray_query_pipeline
type: RayPipeline
nodes:
- name: ESRetriever
replicas: 2
inputs: [ Query ]
- name: Reader
inputs: [ ESRetriever ]
- name: indexing_pipeline - name: indexing_pipeline
type: Indexing type: Pipeline
nodes: nodes:
- name: PDFConverter - name: PDFConverter
inputs: [File] inputs: [File]

View File

@ -1,14 +1,19 @@
from pathlib import Path from pathlib import Path
import pytest import pytest
import ray
from haystack.pipeline import RayPipeline from haystack.pipeline import RayPipeline
@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) @pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True)
def test_load_pipeline(document_store_with_docs): def test_load_pipeline(document_store_with_docs):
pipeline = RayPipeline.load_from_yaml( pipeline = RayPipeline.load_from_yaml(
Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="query_pipeline", num_cpus=8, Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="ray_query_pipeline", num_cpus=8,
) )
prediction = pipeline.run(query="Who lives in Berlin?", top_k_retriever=10, top_k_reader=3) prediction = pipeline.run(query="Who lives in Berlin?", top_k_retriever=10, top_k_reader=3)
assert ray.serve.get_deployment(name="ESRetriever").num_replicas == 2
assert ray.serve.get_deployment(name="Reader").num_replicas == 1
assert prediction["query"] == "Who lives in Berlin?" assert prediction["query"] == "Who lives in Berlin?"
assert prediction["answers"][0]["answer"] == "Carla" assert prediction["answers"][0]["answer"] == "Carla"