Extending the Ray Serve integration to allow attributes for Serve deployments (#2918)

* Extending the Ray Serve integration to allow attributes for Serve deployments

This closes #2917

We should be able to set Ray Serve attributes for the nodes of pipelines, like amount of GPU to use, max_concurrent_queries, etc.

Now this is possible from the pipeline yaml file for each node of the pipeline.

* Ran black and regenerated the json schemas

* Fixing the JSON Schema generation

* Trying to fix the schema CI test issue

* Fixing the test and the schemas

Python 3.8 was generating a different schema than Python 3.7 is creating in the CI. You MUST use Python 3.7 to generate the schemas, otherwise the CIs will fail.

* Merge the two Ray pipeline test cases

* Generate the JSON schemas again after `$ pip install .[all]`

* Removing `haystack/json-schemas/haystack-pipeline-1.16.schema.json`

This was generated by the JSON generator, but based on @ZanSara's instructions, I am removing it.

* Making changes based on @ZanSara's request - the newly requested test is failing

* Fixing the JSON schema generation again

* Renaming `replicas` and moving it under `serve_deployment_kwargs`

* add extras validation, untested

* Dcoumentation update

* Black

* [EMPTY] Re-trigger CI

Co-authored-by: Sara Zan <sarazanzo94@gmail.com>
This commit is contained in:
Zoltan Fedor 2022-08-03 10:38:22 -04:00 committed by GitHub
parent 669f6f0128
commit 7b97bbbff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 29 deletions

View File

@ -851,7 +851,7 @@ With Ray, you can distribute a Pipeline's components across a cluster of machine
Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas
of the Reader and a single replica for the Retriever. This way, you can use your resources more efficiently by horizontally scaling Components.
To set the number of replicas, add `replicas` in the YAML configuration for the node in a pipeline:
To set the number of replicas, add `num_replicas` in the YAML configuration for the node in a pipeline:
```yaml
| components:
@ -862,8 +862,9 @@ To set the number of replicas, add `replicas` in the YAML configuration for the
| type: RayPipeline
| nodes:
| - name: ESRetriever
| replicas: 2 # number of replicas to create on the Ray cluster
| inputs: [ Query ]
| serve_deployment_kwargs:
| num_replicas: 2 # number of replicas to create on the Ray cluster
```
A Ray Pipeline can only be created with a YAML Pipeline configuration.
@ -933,7 +934,8 @@ Here's a sample configuration:
| nodes:
| - name: MyESRetriever
| inputs: [Query]
| replicas: 2 # number of replicas to create on the Ray cluster
| serve_deployment_kwargs:
| num_replicas: 2 # number of replicas to create on the Ray cluster
| - name: MyReader
| inputs: [MyESRetriever]
```

View File

@ -239,10 +239,41 @@
"type": "string"
}
},
"replicas": {
"title": "replicas",
"description": "How many replicas Ray should create for this node (only for Ray pipelines)",
"type": "integer"
"serve_deployment_kwargs": {
"title": "serve_deployment_kwargs",
"description": "Arguments to be passed to the Ray Serve `deployment()` method (only for Ray pipelines)",
"type": "object",
"properties": {
"num_replicas": {
"description": "How many replicas Ray should create for this node (only for Ray pipelines)",
"type": "integer"
},
"version": {
"type": "string"
},
"prev_version": {
"type": "string"
},
"init_args": {
"type": "array"
},
"init_kwargs": {
"type": "object"
},
"router_prefix": {
"type": "string"
},
"ray_actor_options": {
"type": "object"
},
"user_config": {
"type": {}
},
"max_concurrent_queries": {
"type": "integer"
}
},
"additionalProperties": true
}
},
"required": [
@ -285,7 +316,7 @@
"items": {
"not": {
"required": [
"replicas"
"serve_deployment_kwargs"
]
}
}

View File

@ -289,10 +289,25 @@ def get_json_schema(filename: str, version: str, modules: List[str] = ["haystack
"type": "array",
"items": {"type": "string"},
},
"replicas": {
"title": "replicas",
"description": "How many replicas Ray should create for this node (only for Ray pipelines)",
"type": "integer",
"serve_deployment_kwargs": {
"title": "serve_deployment_kwargs",
"description": "Arguments to be passed to the Ray Serve `deployment()` method (only for Ray pipelines)",
"type": "object",
"properties": {
"num_replicas": {
"description": "How many replicas Ray should create for this node (only for Ray pipelines)",
"type": "integer",
},
"version": {"type": "string"},
"prev_version": {"type": "string"},
"init_args": {"type": "array"},
"init_kwargs": {"type": "object"},
"router_prefix": {"type": "string"},
"ray_actor_options": {"type": "object"},
"user_config": {"type": {}},
"max_concurrent_queries": {"type": "integer"},
},
"additionalProperties": True,
},
},
"required": ["name", "inputs"],
@ -315,7 +330,9 @@ def get_json_schema(filename: str, version: str, modules: List[str] = ["haystack
"properties": {
"pipelines": {
"title": "Pipelines",
"items": {"properties": {"nodes": {"items": {"not": {"required": ["replicas"]}}}}},
"items": {
"properties": {"nodes": {"items": {"not": {"required": ["serve_deployment_kwargs"]}}}}
},
}
},
},

View File

@ -179,7 +179,12 @@ def build_component_dependency_graph(
return graph
def validate_yaml(path: Path, strict_version_check: bool = False, overwrite_with_env_variables: bool = True):
def validate_yaml(
path: Path,
strict_version_check: bool = False,
overwrite_with_env_variables: bool = True,
extras: Optional[str] = None,
):
"""
Ensures that the given YAML file can be loaded without issues.
@ -197,16 +202,20 @@ def validate_yaml(path: Path, strict_version_check: bool = False, overwrite_with
to change index name param for an ElasticsearchDocumentStore, an env
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
:param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all.
:return: None if validation is successful
:raise: `PipelineConfigError` in case of issues.
"""
pipeline_config = read_pipeline_config_from_yaml(path)
validate_config(pipeline_config=pipeline_config, strict_version_check=strict_version_check)
validate_config(pipeline_config=pipeline_config, strict_version_check=strict_version_check, extras=extras)
logging.debug(f"'{path}' contains valid Haystack pipelines.")
def validate_config(
pipeline_config: Dict[str, Any], strict_version_check: bool = False, overwrite_with_env_variables: bool = True
pipeline_config: Dict[str, Any],
strict_version_check: bool = False,
overwrite_with_env_variables: bool = True,
extras: Optional[str] = None,
):
"""
Ensures that the given YAML file can be loaded without issues.
@ -225,10 +234,11 @@ def validate_config(
to change index name param for an ElasticsearchDocumentStore, an env
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
:param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all.
:return: None if validation is successful
:raise: `PipelineConfigError` in case of issues.
"""
validate_schema(pipeline_config=pipeline_config, strict_version_check=strict_version_check)
validate_schema(pipeline_config=pipeline_config, strict_version_check=strict_version_check, extras=extras)
for pipeline_definition in pipeline_config["pipelines"]:
component_definitions = get_component_definitions(
@ -237,7 +247,7 @@ def validate_config(
validate_pipeline_graph(pipeline_definition=pipeline_definition, component_definitions=component_definitions)
def validate_schema(pipeline_config: Dict, strict_version_check: bool = False) -> None:
def validate_schema(pipeline_config: Dict, strict_version_check: bool = False, extras: Optional[str] = None) -> None:
"""
Check that the YAML abides the JSON schema, so that every block
of the pipeline configuration file contains all required information
@ -248,11 +258,20 @@ def validate_schema(pipeline_config: Dict, strict_version_check: bool = False) -
:param pipeline_config: the configuration to validate
:param strict_version_check: whether to fail in case of a version mismatch (throws a warning otherwise)
:param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all.
:return: None if validation is successful
:raise: `PipelineConfigError` in case of issues.
"""
validate_config_strings(pipeline_config)
# Check that the extras are respected
extras_in_config = pipeline_config.get("extras", None)
if (not extras and extras_in_config) or (extras and extras_in_config not in extras):
raise PipelineConfigError(
f"Cannot use this class to load a YAML with 'extras: {extras_in_config}'. "
"Use the proper class, for example 'RayPipeline'."
)
# Check for the version manually (to avoid validation errors)
pipeline_version = pipeline_config.get("version", None)

View File

@ -32,7 +32,7 @@ class RayPipeline(Pipeline):
Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas
of the Reader and a single replica for the Retriever. This way, you can use your resources more efficiently by horizontally scaling Components.
To set the number of replicas, add `replicas` in the YAML configuration for the node in a pipeline:
To set the number of replicas, add `num_replicas` in the YAML configuration for the node in a pipeline:
```yaml
| components:
@ -43,8 +43,9 @@ class RayPipeline(Pipeline):
| type: RayPipeline
| nodes:
| - name: ESRetriever
| replicas: 2 # number of replicas to create on the Ray cluster
| inputs: [ Query ]
| serve_deployment_kwargs:
| num_replicas: 2 # number of replicas to create on the Ray cluster
```
A Ray Pipeline can only be created with a YAML Pipeline configuration.
@ -81,7 +82,7 @@ class RayPipeline(Pipeline):
address: Optional[str] = None,
ray_args: Optional[Dict[str, Any]] = None,
):
validate_config(pipeline_config, strict_version_check=strict_version_check)
validate_config(pipeline_config, strict_version_check=strict_version_check, extras="ray")
pipeline_definition = get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=pipeline_name)
component_definitions = get_component_definitions(
@ -101,8 +102,12 @@ class RayPipeline(Pipeline):
name = node_config["name"]
component_type = component_definitions[name]["type"]
component_class = BaseComponent.get_subclass(component_type)
replicas = next(node for node in pipeline_definition["nodes"] if node["name"] == name).get("replicas", 1)
handle = cls._create_ray_deployment(component_name=name, pipeline_config=pipeline_config, replicas=replicas)
serve_deployment_kwargs = next(node for node in pipeline_definition["nodes"] if node["name"] == name).get(
"serve_deployment_kwargs", {}
)
handle = cls._create_ray_deployment(
component_name=name, pipeline_config=pipeline_config, serve_deployment_kwargs=serve_deployment_kwargs
)
pipeline._add_ray_deployment_in_graph(
handle=handle,
name=name,
@ -154,7 +159,8 @@ class RayPipeline(Pipeline):
| nodes:
| - name: MyESRetriever
| inputs: [Query]
| replicas: 2 # number of replicas to create on the Ray cluster
| serve_deployment_kwargs:
| num_replicas: 2 # number of replicas to create on the Ray cluster
| - name: MyReader
| inputs: [MyESRetriever]
```
@ -182,16 +188,23 @@ class RayPipeline(Pipeline):
)
@classmethod
def _create_ray_deployment(cls, component_name: str, pipeline_config: dict, replicas: int = 1):
def _create_ray_deployment(
cls, component_name: str, pipeline_config: dict, serve_deployment_kwargs: Optional[Dict[str, Any]] = {}
):
"""
Create a Ray Deployment for the Component.
:param component_name: Class name of the Haystack Component.
:param pipeline_config: The Pipeline config YAML parsed as a dict.
:param replicas: By default, a single replica of the component is created. It can be
configured by setting `replicas` parameter in the Pipeline YAML.
:param serve_deployment_kwargs: An optional dictionary of arguments to be supplied to the
`ray.serve.deployment()` method, like `num_replicas`, `ray_actor_options`,
`max_concurrent_queries`, etc. See potential values in the
Ray Serve API docs (https://docs.ray.io/en/latest/serve/package-ref.html)
under the `ray.serve.deployment()` method
"""
RayDeployment = serve.deployment(_RayDeploymentWrapper, name=component_name, num_replicas=replicas) # type: ignore
RayDeployment = serve.deployment(
_RayDeploymentWrapper, name=component_name, **serve_deployment_kwargs # type: ignore
)
RayDeployment.deploy(pipeline_config, component_name)
handle = RayDeployment.get_handle()
return handle

View File

@ -1021,3 +1021,10 @@ def test_save_yaml_overwrite(tmp_path):
with open(tmp_path / "saved_pipeline.yml", "r") as saved_yaml:
content = saved_yaml.read()
assert content != ""
def test_load_yaml_ray_args_in_pipeline(tmp_path):
with pytest.raises(PipelineConfigError) as e:
pipeline = Pipeline.load_from_yaml(
SAMPLES_PATH / "pipeline" / "ray.haystack-pipeline.yml", pipeline_name="ray_query_pipeline"
)

View File

@ -31,5 +31,7 @@ def test_load_pipeline(document_store_with_docs):
assert ray.serve.get_deployment(name="ESRetriever").num_replicas == 2
assert ray.serve.get_deployment(name="Reader").num_replicas == 1
assert ray.serve.get_deployment(name="ESRetriever").max_concurrent_queries == 17
assert ray.serve.get_deployment(name="ESRetriever").ray_actor_options["num_cpus"] == 0.5
assert prediction["query"] == "Who lives in Berlin?"
assert prediction["answers"][0].answer == "Carla"

View File

@ -40,7 +40,13 @@ pipelines:
- name: ray_query_pipeline
nodes:
- name: ESRetriever
replicas: 2
inputs: [ Query ]
serve_deployment_kwargs:
num_replicas: 2
version: Twenty
ray_actor_options:
# num_gpus: 0.25 # we have no GPU to test this
num_cpus: 0.5
max_concurrent_queries: 17
- name: Reader
inputs: [ ESRetriever ]