| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  | from pathlib import Path | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import pytest | 
					
						
							| 
									
										
										
										
											2021-08-31 10:14:55 +02:00
										 |  |  | import ray | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-26 18:12:55 +01:00
										 |  |  | from haystack.pipelines import RayPipeline | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-17 10:55:53 +02:00
										 |  |  | from ..conftest import SAMPLES_PATH | 
					
						
							| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-15 11:17:26 +01:00
										 |  |  | @pytest.fixture(autouse=True) | 
					
						
							|  |  |  | def shutdown_ray(): | 
					
						
							|  |  |  |     yield | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         import ray | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-03 12:49:03 -04:00
										 |  |  |         ray.serve.shutdown() | 
					
						
							| 
									
										
										
										
											2022-03-15 11:17:26 +01:00
										 |  |  |         ray.shutdown() | 
					
						
							|  |  |  |     except: | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.integration | 
					
						
							| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  | @pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) | 
					
						
							| 
									
										
										
										
											2022-08-03 12:49:03 -04:00
										 |  |  | @pytest.mark.parametrize("serve_detached", [True, False]) | 
					
						
							|  |  |  | def test_load_pipeline(document_store_with_docs, serve_detached): | 
					
						
							| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  |     pipeline = RayPipeline.load_from_yaml( | 
					
						
							| 
									
										
										
										
											2022-08-11 03:50:14 -04:00
										 |  |  |         SAMPLES_PATH / "pipeline" / "ray.simple.haystack-pipeline.yml", | 
					
						
							| 
									
										
										
										
											2022-05-04 17:39:06 +02:00
										 |  |  |         pipeline_name="ray_query_pipeline", | 
					
						
							|  |  |  |         ray_args={"num_cpus": 8}, | 
					
						
							| 
									
										
										
										
											2022-08-03 12:49:03 -04:00
										 |  |  |         serve_args={"detached": serve_detached}, | 
					
						
							| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-08-11 03:50:14 -04:00
										 |  |  |     prediction = pipeline.run( | 
					
						
							|  |  |  |         query="Who lives in Berlin?", params={"ESRetriever": {"top_k": 10}, "Reader": {"top_k": 3}} | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2021-08-31 10:14:55 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-03 12:49:03 -04:00
										 |  |  |     assert pipeline._serve_controller_client._detached == serve_detached | 
					
						
							| 
									
										
										
										
											2021-08-31 10:14:55 +02:00
										 |  |  |     assert ray.serve.get_deployment(name="ESRetriever").num_replicas == 2 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="Reader").num_replicas == 1 | 
					
						
							| 
									
										
										
										
											2022-08-03 10:38:22 -04:00
										 |  |  |     assert ray.serve.get_deployment(name="ESRetriever").max_concurrent_queries == 17 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever").ray_actor_options["num_cpus"] == 0.5 | 
					
						
							| 
									
										
										
										
											2021-08-02 14:51:24 +02:00
										 |  |  |     assert prediction["query"] == "Who lives in Berlin?" | 
					
						
							| 
									
										
										
										
											2021-10-13 14:23:23 +02:00
										 |  |  |     assert prediction["answers"][0].answer == "Carla" | 
					
						
							| 
									
										
										
										
											2022-08-11 03:50:14 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.integration | 
					
						
							|  |  |  | @pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) | 
					
						
							|  |  |  | def test_load_advanced_pipeline(document_store_with_docs): | 
					
						
							|  |  |  |     pipeline = RayPipeline.load_from_yaml( | 
					
						
							|  |  |  |         SAMPLES_PATH / "pipeline" / "ray.advanced.haystack-pipeline.yml", | 
					
						
							|  |  |  |         pipeline_name="ray_query_pipeline", | 
					
						
							|  |  |  |         ray_args={"num_cpus": 8}, | 
					
						
							|  |  |  |         serve_args={"detached": True}, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     prediction = pipeline.run( | 
					
						
							|  |  |  |         query="Who lives in Berlin?", | 
					
						
							|  |  |  |         params={"ESRetriever1": {"top_k": 1}, "ESRetriever2": {"top_k": 2}, "Reader": {"top_k": 3}}, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert pipeline._serve_controller_client._detached is True | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever1").num_replicas == 2 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever2").num_replicas == 2 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="Reader").num_replicas == 1 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever1").max_concurrent_queries == 17 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever2").max_concurrent_queries == 15 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever1").ray_actor_options["num_cpus"] == 0.25 | 
					
						
							|  |  |  |     assert ray.serve.get_deployment(name="ESRetriever2").ray_actor_options["num_cpus"] == 0.25 | 
					
						
							|  |  |  |     assert prediction["query"] == "Who lives in Berlin?" | 
					
						
							|  |  |  |     assert prediction["answers"][0].answer == "Carla" | 
					
						
							|  |  |  |     assert len(prediction["answers"]) > 1 |