diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 64dc013b3..38ed5699d 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -851,7 +851,7 @@ With Ray, you can distribute a Pipeline's components across a cluster of machine Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas of the Reader and a single replica for the Retriever. This way, you can use your resources more efficiently by horizontally scaling Components. -To set the number of replicas, add `replicas` in the YAML configuration for the node in a pipeline: +To set the number of replicas, add `num_replicas` in the YAML configuration for the node in a pipeline: ```yaml | components: @@ -862,8 +862,9 @@ To set the number of replicas, add `replicas` in the YAML configuration for the | type: RayPipeline | nodes: | - name: ESRetriever - | replicas: 2 # number of replicas to create on the Ray cluster | inputs: [ Query ] + | serve_deployment_kwargs: + | num_replicas: 2 # number of replicas to create on the Ray cluster ``` A Ray Pipeline can only be created with a YAML Pipeline configuration. @@ -933,7 +934,8 @@ Here's a sample configuration: | nodes: | - name: MyESRetriever | inputs: [Query] - | replicas: 2 # number of replicas to create on the Ray cluster + | serve_deployment_kwargs: + | num_replicas: 2 # number of replicas to create on the Ray cluster | - name: MyReader | inputs: [MyESRetriever] ``` diff --git a/haystack/json-schemas/haystack-pipeline-master.schema.json b/haystack/json-schemas/haystack-pipeline-master.schema.json index 9776d870d..a0ed05ffd 100644 --- a/haystack/json-schemas/haystack-pipeline-master.schema.json +++ b/haystack/json-schemas/haystack-pipeline-master.schema.json @@ -239,10 +239,41 @@ "type": "string" } }, - "replicas": { - "title": "replicas", - "description": "How many replicas Ray should create for this node (only for Ray pipelines)", - "type": "integer" + "serve_deployment_kwargs": { + "title": "serve_deployment_kwargs", + "description": "Arguments to be passed to the Ray Serve `deployment()` method (only for Ray pipelines)", + "type": "object", + "properties": { + "num_replicas": { + "description": "How many replicas Ray should create for this node (only for Ray pipelines)", + "type": "integer" + }, + "version": { + "type": "string" + }, + "prev_version": { + "type": "string" + }, + "init_args": { + "type": "array" + }, + "init_kwargs": { + "type": "object" + }, + "router_prefix": { + "type": "string" + }, + "ray_actor_options": { + "type": "object" + }, + "user_config": { + "type": {} + }, + "max_concurrent_queries": { + "type": "integer" + } + }, + "additionalProperties": true } }, "required": [ @@ -285,7 +316,7 @@ "items": { "not": { "required": [ - "replicas" + "serve_deployment_kwargs" ] } } diff --git a/haystack/nodes/_json_schema.py b/haystack/nodes/_json_schema.py index 17de407f2..f16ceaf0c 100644 --- a/haystack/nodes/_json_schema.py +++ b/haystack/nodes/_json_schema.py @@ -289,10 +289,25 @@ def get_json_schema(filename: str, version: str, modules: List[str] = ["haystack "type": "array", "items": {"type": "string"}, }, - "replicas": { - "title": "replicas", - "description": "How many replicas Ray should create for this node (only for Ray pipelines)", - "type": "integer", + "serve_deployment_kwargs": { + "title": "serve_deployment_kwargs", + "description": "Arguments to be passed to the Ray Serve `deployment()` method (only for Ray pipelines)", + "type": "object", + "properties": { + "num_replicas": { + "description": "How many replicas Ray should create for this node (only for Ray pipelines)", + "type": "integer", + }, + "version": {"type": "string"}, + "prev_version": {"type": "string"}, + "init_args": {"type": "array"}, + "init_kwargs": {"type": "object"}, + "router_prefix": {"type": "string"}, + "ray_actor_options": {"type": "object"}, + "user_config": {"type": {}}, + "max_concurrent_queries": {"type": "integer"}, + }, + "additionalProperties": True, }, }, "required": ["name", "inputs"], @@ -315,7 +330,9 @@ def get_json_schema(filename: str, version: str, modules: List[str] = ["haystack "properties": { "pipelines": { "title": "Pipelines", - "items": {"properties": {"nodes": {"items": {"not": {"required": ["replicas"]}}}}}, + "items": { + "properties": {"nodes": {"items": {"not": {"required": ["serve_deployment_kwargs"]}}}} + }, } }, }, diff --git a/haystack/pipelines/config.py b/haystack/pipelines/config.py index ab992d71e..591a36f88 100644 --- a/haystack/pipelines/config.py +++ b/haystack/pipelines/config.py @@ -179,7 +179,12 @@ def build_component_dependency_graph( return graph -def validate_yaml(path: Path, strict_version_check: bool = False, overwrite_with_env_variables: bool = True): +def validate_yaml( + path: Path, + strict_version_check: bool = False, + overwrite_with_env_variables: bool = True, + extras: Optional[str] = None, +): """ Ensures that the given YAML file can be loaded without issues. @@ -197,16 +202,20 @@ def validate_yaml(path: Path, strict_version_check: bool = False, overwrite_with to change index name param for an ElasticsearchDocumentStore, an env variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an `_` sign must be used to specify nested hierarchical properties. + :param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all. :return: None if validation is successful :raise: `PipelineConfigError` in case of issues. """ pipeline_config = read_pipeline_config_from_yaml(path) - validate_config(pipeline_config=pipeline_config, strict_version_check=strict_version_check) + validate_config(pipeline_config=pipeline_config, strict_version_check=strict_version_check, extras=extras) logging.debug(f"'{path}' contains valid Haystack pipelines.") def validate_config( - pipeline_config: Dict[str, Any], strict_version_check: bool = False, overwrite_with_env_variables: bool = True + pipeline_config: Dict[str, Any], + strict_version_check: bool = False, + overwrite_with_env_variables: bool = True, + extras: Optional[str] = None, ): """ Ensures that the given YAML file can be loaded without issues. @@ -225,10 +234,11 @@ def validate_config( to change index name param for an ElasticsearchDocumentStore, an env variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an `_` sign must be used to specify nested hierarchical properties. + :param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all. :return: None if validation is successful :raise: `PipelineConfigError` in case of issues. """ - validate_schema(pipeline_config=pipeline_config, strict_version_check=strict_version_check) + validate_schema(pipeline_config=pipeline_config, strict_version_check=strict_version_check, extras=extras) for pipeline_definition in pipeline_config["pipelines"]: component_definitions = get_component_definitions( @@ -237,7 +247,7 @@ def validate_config( validate_pipeline_graph(pipeline_definition=pipeline_definition, component_definitions=component_definitions) -def validate_schema(pipeline_config: Dict, strict_version_check: bool = False) -> None: +def validate_schema(pipeline_config: Dict, strict_version_check: bool = False, extras: Optional[str] = None) -> None: """ Check that the YAML abides the JSON schema, so that every block of the pipeline configuration file contains all required information @@ -248,11 +258,20 @@ def validate_schema(pipeline_config: Dict, strict_version_check: bool = False) - :param pipeline_config: the configuration to validate :param strict_version_check: whether to fail in case of a version mismatch (throws a warning otherwise) + :param extras: which values are allowed in the `extras` field (for example, `ray`). If None, does not allow the `extras` field at all. :return: None if validation is successful :raise: `PipelineConfigError` in case of issues. """ validate_config_strings(pipeline_config) + # Check that the extras are respected + extras_in_config = pipeline_config.get("extras", None) + if (not extras and extras_in_config) or (extras and extras_in_config not in extras): + raise PipelineConfigError( + f"Cannot use this class to load a YAML with 'extras: {extras_in_config}'. " + "Use the proper class, for example 'RayPipeline'." + ) + # Check for the version manually (to avoid validation errors) pipeline_version = pipeline_config.get("version", None) diff --git a/haystack/pipelines/ray.py b/haystack/pipelines/ray.py index e326bb8af..a45c64d4d 100644 --- a/haystack/pipelines/ray.py +++ b/haystack/pipelines/ray.py @@ -32,7 +32,7 @@ class RayPipeline(Pipeline): Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas of the Reader and a single replica for the Retriever. This way, you can use your resources more efficiently by horizontally scaling Components. - To set the number of replicas, add `replicas` in the YAML configuration for the node in a pipeline: + To set the number of replicas, add `num_replicas` in the YAML configuration for the node in a pipeline: ```yaml | components: @@ -43,8 +43,9 @@ class RayPipeline(Pipeline): | type: RayPipeline | nodes: | - name: ESRetriever - | replicas: 2 # number of replicas to create on the Ray cluster | inputs: [ Query ] + | serve_deployment_kwargs: + | num_replicas: 2 # number of replicas to create on the Ray cluster ``` A Ray Pipeline can only be created with a YAML Pipeline configuration. @@ -81,7 +82,7 @@ class RayPipeline(Pipeline): address: Optional[str] = None, ray_args: Optional[Dict[str, Any]] = None, ): - validate_config(pipeline_config, strict_version_check=strict_version_check) + validate_config(pipeline_config, strict_version_check=strict_version_check, extras="ray") pipeline_definition = get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=pipeline_name) component_definitions = get_component_definitions( @@ -101,8 +102,12 @@ class RayPipeline(Pipeline): name = node_config["name"] component_type = component_definitions[name]["type"] component_class = BaseComponent.get_subclass(component_type) - replicas = next(node for node in pipeline_definition["nodes"] if node["name"] == name).get("replicas", 1) - handle = cls._create_ray_deployment(component_name=name, pipeline_config=pipeline_config, replicas=replicas) + serve_deployment_kwargs = next(node for node in pipeline_definition["nodes"] if node["name"] == name).get( + "serve_deployment_kwargs", {} + ) + handle = cls._create_ray_deployment( + component_name=name, pipeline_config=pipeline_config, serve_deployment_kwargs=serve_deployment_kwargs + ) pipeline._add_ray_deployment_in_graph( handle=handle, name=name, @@ -154,7 +159,8 @@ class RayPipeline(Pipeline): | nodes: | - name: MyESRetriever | inputs: [Query] - | replicas: 2 # number of replicas to create on the Ray cluster + | serve_deployment_kwargs: + | num_replicas: 2 # number of replicas to create on the Ray cluster | - name: MyReader | inputs: [MyESRetriever] ``` @@ -182,16 +188,23 @@ class RayPipeline(Pipeline): ) @classmethod - def _create_ray_deployment(cls, component_name: str, pipeline_config: dict, replicas: int = 1): + def _create_ray_deployment( + cls, component_name: str, pipeline_config: dict, serve_deployment_kwargs: Optional[Dict[str, Any]] = {} + ): """ Create a Ray Deployment for the Component. :param component_name: Class name of the Haystack Component. :param pipeline_config: The Pipeline config YAML parsed as a dict. - :param replicas: By default, a single replica of the component is created. It can be - configured by setting `replicas` parameter in the Pipeline YAML. + :param serve_deployment_kwargs: An optional dictionary of arguments to be supplied to the + `ray.serve.deployment()` method, like `num_replicas`, `ray_actor_options`, + `max_concurrent_queries`, etc. See potential values in the + Ray Serve API docs (https://docs.ray.io/en/latest/serve/package-ref.html) + under the `ray.serve.deployment()` method """ - RayDeployment = serve.deployment(_RayDeploymentWrapper, name=component_name, num_replicas=replicas) # type: ignore + RayDeployment = serve.deployment( + _RayDeploymentWrapper, name=component_name, **serve_deployment_kwargs # type: ignore + ) RayDeployment.deploy(pipeline_config, component_name) handle = RayDeployment.get_handle() return handle diff --git a/test/pipelines/test_pipeline_yaml.py b/test/pipelines/test_pipeline_yaml.py index 353ec48dc..64c9bcaa0 100644 --- a/test/pipelines/test_pipeline_yaml.py +++ b/test/pipelines/test_pipeline_yaml.py @@ -1021,3 +1021,10 @@ def test_save_yaml_overwrite(tmp_path): with open(tmp_path / "saved_pipeline.yml", "r") as saved_yaml: content = saved_yaml.read() assert content != "" + + +def test_load_yaml_ray_args_in_pipeline(tmp_path): + with pytest.raises(PipelineConfigError) as e: + pipeline = Pipeline.load_from_yaml( + SAMPLES_PATH / "pipeline" / "ray.haystack-pipeline.yml", pipeline_name="ray_query_pipeline" + ) diff --git a/test/pipelines/test_ray.py b/test/pipelines/test_ray.py index 5a4171f90..80943a7b8 100644 --- a/test/pipelines/test_ray.py +++ b/test/pipelines/test_ray.py @@ -31,5 +31,7 @@ def test_load_pipeline(document_store_with_docs): assert ray.serve.get_deployment(name="ESRetriever").num_replicas == 2 assert ray.serve.get_deployment(name="Reader").num_replicas == 1 + assert ray.serve.get_deployment(name="ESRetriever").max_concurrent_queries == 17 + assert ray.serve.get_deployment(name="ESRetriever").ray_actor_options["num_cpus"] == 0.5 assert prediction["query"] == "Who lives in Berlin?" assert prediction["answers"][0].answer == "Carla" diff --git a/test/samples/pipeline/ray.haystack-pipeline.yml b/test/samples/pipeline/ray.haystack-pipeline.yml index 8dd4489af..e89ec98de 100644 --- a/test/samples/pipeline/ray.haystack-pipeline.yml +++ b/test/samples/pipeline/ray.haystack-pipeline.yml @@ -40,7 +40,13 @@ pipelines: - name: ray_query_pipeline nodes: - name: ESRetriever - replicas: 2 inputs: [ Query ] + serve_deployment_kwargs: + num_replicas: 2 + version: Twenty + ray_actor_options: + # num_gpus: 0.25 # we have no GPU to test this + num_cpus: 0.5 + max_concurrent_queries: 17 - name: Reader inputs: [ ESRetriever ]