From 1e3edef80354559d824daf79d726e69032ab41b3 Mon Sep 17 00:00:00 2001 From: tstadel <60758086+tstadel@users.noreply.github.com> Date: Tue, 8 Feb 2022 20:35:25 +0100 Subject: [PATCH] List all pipeline(_configs) on Deepset Cloud (#2102) * add list_pipelines_on_deepset_cloud() * Apply Black * refactor auto paging and throw DeepsetCloudErrors * Apply Black * fix mypy findings * Update documentation * Fix merge error on pipelines.md * Update Documentation & Code Style Co-authored-by: Sara Zan Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- docs/_src/api/api/pipelines.md | 42 +++++++- haystack/pipelines/base.py | 43 +++++++- haystack/utils/__init__.py | 2 +- haystack/utils/deepsetcloud.py | 180 +++++++++++++++++++++++++++++---- test/test_document_store.py | 7 +- test/test_pipeline.py | 30 ++++++ 6 files changed, 276 insertions(+), 28 deletions(-) diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 305e18fdc..b8199b99a 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -82,7 +82,7 @@ variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an ```python @classmethod -def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: Optional[str] = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False) +def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False) ``` Load Pipeline from Deepset Cloud defining the individual components and how they're tied together to form @@ -90,9 +90,13 @@ Load Pipeline from Deepset Cloud defining the individual components and how they a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must be passed. +In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`. +Use the returned `name` as `pipeline_config_name`. + **Arguments**: - `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace. +To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`. - `pipeline_name`: specifies which pipeline to load from config. Deepset Cloud typically provides a 'query' and a 'index' pipeline per config. - `workspace`: workspace in Deepset Cloud @@ -105,6 +109,42 @@ to change return_no_answer param for a FARMReader, an env variable 'READER_PARAMS_RETURN_NO_ANSWER=False' can be set. Note that an `_` sign must be used to specify nested hierarchical properties. + + +#### list\_pipelines\_on\_deepset\_cloud + +```python +@classmethod +def list_pipelines_on_deepset_cloud(cls, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None) -> List[dict] +``` + +Lists all pipeline configs available on Deepset Cloud. + +**Arguments**: + +- `workspace`: workspace in Deepset Cloud +- `api_key`: Secret value of the API key. +If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable. +- `api_endpoint`: The URL of the Deepset Cloud API. +If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable. + +Returns: + list of dictionaries: List[dict] + each dictionary: { + "name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`, + "..." -> additional pipeline meta information + } + example: + [{'name': 'my_super_nice_pipeline_config', + 'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2', + 'status': 'DEPLOYED', + 'created_at': '2022-02-01T09:57:03.803991+00:00', + 'deleted': False, + 'is_default': False, + 'indexing': {'status': 'IN_PROGRESS', + 'pending_file_count': 3, + 'total_file_count': 31}}] + ## Pipeline diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index cb01db2cc..97191d57d 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -126,7 +126,7 @@ class BasePipeline: cls, pipeline_config_name: str, pipeline_name: str = "query", - workspace: Optional[str] = "default", + workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False, @@ -136,7 +136,11 @@ class BasePipeline: a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must be passed. + In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`. + Use the returned `name` as `pipeline_config_name`. + :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace. + To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`. :param pipeline_name: specifies which pipeline to load from config. Deepset Cloud typically provides a 'query' and a 'index' pipeline per config. :param workspace: workspace in Deepset Cloud @@ -175,6 +179,43 @@ class BasePipeline: ) return pipeline + @classmethod + def list_pipelines_on_deepset_cloud( + cls, + workspace: str = "default", + api_key: Optional[str] = None, + api_endpoint: Optional[str] = None, + ) -> List[dict]: + """ + Lists all pipeline configs available on Deepset Cloud. + + :param workspace: workspace in Deepset Cloud + :param api_key: Secret value of the API key. + If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable. + :param api_endpoint: The URL of the Deepset Cloud API. + If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable. + + Returns: + list of dictionaries: List[dict] + each dictionary: { + "name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`, + "..." -> additional pipeline meta information + } + example: + [{'name': 'my_super_nice_pipeline_config', + 'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2', + 'status': 'DEPLOYED', + 'created_at': '2022-02-01T09:57:03.803991+00:00', + 'deleted': False, + 'is_default': False, + 'indexing': {'status': 'IN_PROGRESS', + 'pending_file_count': 3, + 'total_file_count': 31}}] + """ + client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace) + pipeline_config_infos = list(client.list_pipeline_configs()) + return pipeline_config_infos + @classmethod def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None): """ diff --git a/haystack/utils/__init__.py b/haystack/utils/__init__.py index fab857caa..0b0a65df0 100644 --- a/haystack/utils/__init__.py +++ b/haystack/utils/__init__.py @@ -10,7 +10,7 @@ from haystack.utils.doc_store import ( stop_opensearch, stop_service, ) -from haystack.utils.deepsetcloud import DeepsetCloud +from haystack.utils.deepsetcloud import DeepsetCloud, DeepsetCloudError from haystack.utils.export_utils import ( print_answers, print_documents, diff --git a/haystack/utils/deepsetcloud.py b/haystack/utils/deepsetcloud.py index 106426b5d..fc80fcddc 100644 --- a/haystack/utils/deepsetcloud.py +++ b/haystack/utils/deepsetcloud.py @@ -1,6 +1,12 @@ import logging import os -from typing import Dict, List, Optional +from typing import Dict, Generator, List, Optional + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal # type: ignore + import requests DEFAULT_API_ENDPOINT = f"DC_API_PLACEHOLDER/v1" # TODO @@ -17,6 +23,10 @@ class BearerAuth(requests.auth.AuthBase): return r +class DeepsetCloudError(Exception): + """Raised when there is an error communicating with Deepset Cloud""" + + class DeepsetCloudClient: def __init__(self, api_key: str = None, api_endpoint: Optional[str] = None): """ @@ -29,26 +39,143 @@ class DeepsetCloudClient: """ self.api_key = api_key or os.getenv("DEEPSET_CLOUD_API_KEY") if self.api_key is None: - raise ValueError( + raise DeepsetCloudError( "No api_key specified. Please set api_key param or DEEPSET_CLOUD_API_KEY environment variable." ) self.api_endpoint = api_endpoint or os.getenv("DEEPSET_CLOUD_API_ENDPOINT", DEFAULT_API_ENDPOINT) - def get(self, url: str, headers: dict = None, query_params: dict = None, raise_on_error: bool = True): - response = requests.get(url=url, auth=BearerAuth(self.api_key), headers=headers, params=query_params) - if raise_on_error and response.status_code > 299: - raise Exception( - f"GET {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}" - ) - return response + def get( + self, + url: str, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + ): + return self._execute_request( + method="GET", + url=url, + query_params=query_params, + headers=headers, + stream=stream, + raise_on_error=raise_on_error, + ) - def post(self, url: str, json: dict = {}, stream: bool = False, headers: dict = None, raise_on_error: bool = True): - json = self._remove_null_values(json) - response = requests.post(url=url, json=json, stream=stream, headers=headers, auth=BearerAuth(self.api_key)) + def get_with_auto_paging( + self, + url: str, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + auto_paging_page_size: Optional[int] = None, + ) -> Generator: + return self._execute_auto_paging_request( + method="GET", + url=url, + query_params=query_params, + headers=headers, + stream=stream, + raise_on_error=raise_on_error, + auto_paging_page_size=auto_paging_page_size, + ) + + def post( + self, + url: str, + json: dict = {}, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + ): + return self._execute_request( + method="POST", + url=url, + query_params=query_params, + json=json, + stream=stream, + headers=headers, + raise_on_error=raise_on_error, + ) + + def post_with_auto_paging( + self, + url: str, + json: dict = {}, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + auto_paging_page_size: Optional[int] = None, + ): + return self._execute_auto_paging_request( + method="POST", + url=url, + query_params=query_params, + json=json, + stream=stream, + headers=headers, + raise_on_error=raise_on_error, + auto_paging_page_size=auto_paging_page_size, + ) + + def _execute_auto_paging_request( + self, + method: Literal["GET", "POST", "PUT", "HEAD"], + url: str, + json: dict = None, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + auto_paging_page_size: Optional[int] = None, + ) -> Generator: + query_params = query_params.copy() if query_params is not None else {} + if auto_paging_page_size: + query_params["limit"] = auto_paging_page_size + page_number = 1 + has_more = True + while has_more: + query_params["page_number"] = page_number + payload = self._execute_request( + method=method, + url=url, + json=json, + query_params=query_params, + headers=headers, + stream=stream, + raise_on_error=raise_on_error, + ).json() + yield from payload["data"] + has_more = payload["has_more"] + page_number += 1 + + def _execute_request( + self, + method: Literal["GET", "POST", "PUT", "HEAD"], + url: str, + json: dict = None, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + ): + if json is not None: + json = self._remove_null_values(json) + response = requests.request( + method=method, + url=url, + json=json, + params=query_params, + headers=headers, + auth=BearerAuth(self.api_key), + stream=stream, + ) if raise_on_error and response.status_code > 299: - raise Exception( - f"POST {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}" + raise DeepsetCloudError( + f"{method} {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}" ) return response @@ -81,7 +208,7 @@ class IndexClient: response = self.client.get(url=index_url, headers=headers) return response.json() except Exception as ie: - raise Exception(f"Could not connect to Deepset Cloud:\n{ie}") from ie + raise DeepsetCloudError(f"Could not connect to Deepset Cloud:\n{ie}") from ie def query( self, @@ -189,17 +316,26 @@ class PipelineClient: ) -> dict: pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name) pipeline_config_url = f"{pipeline_url}/json" - response = self.client.get(url=pipeline_config_url, headers=headers) - return response.json() + response = self.client.get(url=pipeline_config_url, headers=headers).json() + return response + + def list_pipeline_configs(self, workspace: Optional[str] = None, headers: dict = None) -> Generator: + workspace_url = self._build_workspace_url(workspace) + pipelines_url = f"{workspace_url}/pipelines" + generator = self.client.get_with_auto_paging(url=pipelines_url, headers=headers) + return generator def _build_pipeline_url(self, workspace: Optional[str] = None, pipeline_config_name: Optional[str] = None): - if workspace is None: - workspace = self.workspace if pipeline_config_name is None: pipeline_config_name = self.pipeline_config_name - workspace_url = self.client.build_workspace_url(workspace) + workspace_url = self._build_workspace_url(workspace) return f"{workspace_url}/pipelines/{pipeline_config_name}" + def _build_workspace_url(self, workspace: Optional[str] = None): + if workspace is None: + workspace = self.workspace + return self.client.build_workspace_url(workspace) + class DeepsetCloud: """ @@ -211,7 +347,7 @@ class DeepsetCloud: cls, api_key: Optional[str] = None, api_endpoint: Optional[str] = None, - workspace: Optional[str] = None, + workspace: str = "default", index: Optional[str] = None, ) -> IndexClient: """ @@ -233,7 +369,7 @@ class DeepsetCloud: cls, api_key: Optional[str] = None, api_endpoint: Optional[str] = None, - workspace: Optional[str] = None, + workspace: str = "default", pipeline_config_name: Optional[str] = None, ) -> PipelineClient: """ diff --git a/test/test_document_store.py b/test/test_document_store.py index 46f8a7478..5eb3b49d6 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -25,6 +25,7 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore from haystack.document_stores.faiss import FAISSDocumentStore from haystack.nodes import EmbeddingRetriever from haystack.pipelines import DocumentSearchPipeline +from haystack.utils import DeepsetCloudError DOCUMENTS = [ @@ -1254,7 +1255,7 @@ def test_DeepsetCloudDocumentStore_invalid_token(): ) with pytest.raises( - Exception, + DeepsetCloudError, match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 500 - Internal Server Error", ): DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key="invalid_token", index=DC_TEST_INDEX) @@ -1272,7 +1273,7 @@ def test_DeepsetCloudDocumentStore_invalid_api_endpoint(): ) with pytest.raises( - Exception, + DeepsetCloudError, match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}00/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 404 - Not Found", ): DeepsetCloudDocumentStore(api_endpoint=f"{DC_API_ENDPOINT}00", api_key=DC_API_KEY, index=DC_TEST_INDEX) @@ -1290,7 +1291,7 @@ def test_DeepsetCloudDocumentStore_invalid_index(): ) with pytest.raises( - Exception, + DeepsetCloudError, match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/invalid_index failed: HTTP 404 - Not Found", ): DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY, index="invalid_index") diff --git a/test/test_pipeline.py b/test/test_pipeline.py index ebb235510..e128d4558 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -224,6 +224,36 @@ def test_load_from_deepset_cloud_indexing(): indexing_pipeline.run(file_paths=[SAMPLES_PATH / "docs" / "doc_1.txt"]) +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_list_pipelines_on_deepset_cloud(): + if MOCK_DC: + responses.add( + method=responses.GET, + url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines", + json={ + "data": [ + { + "name": "test_pipeline_config", + "pipeline_id": "2184e0c1-c6ec-40a1-9b28-5d2768e5efa2", + "status": "DEPLOYED", + "created_at": "2022-02-01T09:57:03.803991+00:00", + "deleted": False, + "is_default": False, + "indexing": {"status": "IN_PROGRESS", "pending_file_count": 4, "total_file_count": 33}, + } + ], + "has_more": False, + "total": 1, + }, + status=200, + ) + + pipelines = Pipeline.list_pipelines_on_deepset_cloud(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY) + assert len(pipelines) == 1 + assert pipelines[0]["name"] == "test_pipeline_config" + + # @pytest.mark.slow # @pytest.mark.elasticsearch # @pytest.mark.parametrize(