diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md
index 305e18fdc..b8199b99a 100644
--- a/docs/_src/api/api/pipelines.md
+++ b/docs/_src/api/api/pipelines.md
@@ -82,7 +82,7 @@ variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
```python
@classmethod
-def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: Optional[str] = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False)
+def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False)
```
Load Pipeline from Deepset Cloud defining the individual components and how they're tied together to form
@@ -90,9 +90,13 @@ Load Pipeline from Deepset Cloud defining the individual components and how they
a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.
+In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
+Use the returned `name` as `pipeline_config_name`.
+
**Arguments**:
- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
+To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
- `pipeline_name`: specifies which pipeline to load from config.
Deepset Cloud typically provides a 'query' and a 'index' pipeline per config.
- `workspace`: workspace in Deepset Cloud
@@ -105,6 +109,42 @@ to change return_no_answer param for a FARMReader, an env
variable 'READER_PARAMS_RETURN_NO_ANSWER=False' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
+
+
+#### list\_pipelines\_on\_deepset\_cloud
+
+```python
+@classmethod
+def list_pipelines_on_deepset_cloud(cls, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None) -> List[dict]
+```
+
+Lists all pipeline configs available on Deepset Cloud.
+
+**Arguments**:
+
+- `workspace`: workspace in Deepset Cloud
+- `api_key`: Secret value of the API key.
+If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+- `api_endpoint`: The URL of the Deepset Cloud API.
+If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+
+Returns:
+ list of dictionaries: List[dict]
+ each dictionary: {
+ "name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`,
+ "..." -> additional pipeline meta information
+ }
+ example:
+ [{'name': 'my_super_nice_pipeline_config',
+ 'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2',
+ 'status': 'DEPLOYED',
+ 'created_at': '2022-02-01T09:57:03.803991+00:00',
+ 'deleted': False,
+ 'is_default': False,
+ 'indexing': {'status': 'IN_PROGRESS',
+ 'pending_file_count': 3,
+ 'total_file_count': 31}}]
+
## Pipeline
diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py
index cb01db2cc..97191d57d 100644
--- a/haystack/pipelines/base.py
+++ b/haystack/pipelines/base.py
@@ -126,7 +126,7 @@ class BasePipeline:
cls,
pipeline_config_name: str,
pipeline_name: str = "query",
- workspace: Optional[str] = "default",
+ workspace: str = "default",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
overwrite_with_env_variables: bool = False,
@@ -136,7 +136,11 @@ class BasePipeline:
a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.
+ In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
+ Use the returned `name` as `pipeline_config_name`.
+
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
:param pipeline_name: specifies which pipeline to load from config.
Deepset Cloud typically provides a 'query' and a 'index' pipeline per config.
:param workspace: workspace in Deepset Cloud
@@ -175,6 +179,43 @@ class BasePipeline:
)
return pipeline
+ @classmethod
+ def list_pipelines_on_deepset_cloud(
+ cls,
+ workspace: str = "default",
+ api_key: Optional[str] = None,
+ api_endpoint: Optional[str] = None,
+ ) -> List[dict]:
+ """
+ Lists all pipeline configs available on Deepset Cloud.
+
+ :param workspace: workspace in Deepset Cloud
+ :param api_key: Secret value of the API key.
+ If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+ :param api_endpoint: The URL of the Deepset Cloud API.
+ If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+
+ Returns:
+ list of dictionaries: List[dict]
+ each dictionary: {
+ "name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`,
+ "..." -> additional pipeline meta information
+ }
+ example:
+ [{'name': 'my_super_nice_pipeline_config',
+ 'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2',
+ 'status': 'DEPLOYED',
+ 'created_at': '2022-02-01T09:57:03.803991+00:00',
+ 'deleted': False,
+ 'is_default': False,
+ 'indexing': {'status': 'IN_PROGRESS',
+ 'pending_file_count': 3,
+ 'total_file_count': 31}}]
+ """
+ client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
+ pipeline_config_infos = list(client.list_pipeline_configs())
+ return pipeline_config_infos
+
@classmethod
def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None):
"""
diff --git a/haystack/utils/__init__.py b/haystack/utils/__init__.py
index fab857caa..0b0a65df0 100644
--- a/haystack/utils/__init__.py
+++ b/haystack/utils/__init__.py
@@ -10,7 +10,7 @@ from haystack.utils.doc_store import (
stop_opensearch,
stop_service,
)
-from haystack.utils.deepsetcloud import DeepsetCloud
+from haystack.utils.deepsetcloud import DeepsetCloud, DeepsetCloudError
from haystack.utils.export_utils import (
print_answers,
print_documents,
diff --git a/haystack/utils/deepsetcloud.py b/haystack/utils/deepsetcloud.py
index 106426b5d..fc80fcddc 100644
--- a/haystack/utils/deepsetcloud.py
+++ b/haystack/utils/deepsetcloud.py
@@ -1,6 +1,12 @@
import logging
import os
-from typing import Dict, List, Optional
+from typing import Dict, Generator, List, Optional
+
+try:
+ from typing import Literal
+except ImportError:
+ from typing_extensions import Literal # type: ignore
+
import requests
DEFAULT_API_ENDPOINT = f"DC_API_PLACEHOLDER/v1" # TODO
@@ -17,6 +23,10 @@ class BearerAuth(requests.auth.AuthBase):
return r
+class DeepsetCloudError(Exception):
+ """Raised when there is an error communicating with Deepset Cloud"""
+
+
class DeepsetCloudClient:
def __init__(self, api_key: str = None, api_endpoint: Optional[str] = None):
"""
@@ -29,26 +39,143 @@ class DeepsetCloudClient:
"""
self.api_key = api_key or os.getenv("DEEPSET_CLOUD_API_KEY")
if self.api_key is None:
- raise ValueError(
+ raise DeepsetCloudError(
"No api_key specified. Please set api_key param or DEEPSET_CLOUD_API_KEY environment variable."
)
self.api_endpoint = api_endpoint or os.getenv("DEEPSET_CLOUD_API_ENDPOINT", DEFAULT_API_ENDPOINT)
- def get(self, url: str, headers: dict = None, query_params: dict = None, raise_on_error: bool = True):
- response = requests.get(url=url, auth=BearerAuth(self.api_key), headers=headers, params=query_params)
- if raise_on_error and response.status_code > 299:
- raise Exception(
- f"GET {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
- )
- return response
+ def get(
+ self,
+ url: str,
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ ):
+ return self._execute_request(
+ method="GET",
+ url=url,
+ query_params=query_params,
+ headers=headers,
+ stream=stream,
+ raise_on_error=raise_on_error,
+ )
- def post(self, url: str, json: dict = {}, stream: bool = False, headers: dict = None, raise_on_error: bool = True):
- json = self._remove_null_values(json)
- response = requests.post(url=url, json=json, stream=stream, headers=headers, auth=BearerAuth(self.api_key))
+ def get_with_auto_paging(
+ self,
+ url: str,
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ auto_paging_page_size: Optional[int] = None,
+ ) -> Generator:
+ return self._execute_auto_paging_request(
+ method="GET",
+ url=url,
+ query_params=query_params,
+ headers=headers,
+ stream=stream,
+ raise_on_error=raise_on_error,
+ auto_paging_page_size=auto_paging_page_size,
+ )
+
+ def post(
+ self,
+ url: str,
+ json: dict = {},
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ ):
+ return self._execute_request(
+ method="POST",
+ url=url,
+ query_params=query_params,
+ json=json,
+ stream=stream,
+ headers=headers,
+ raise_on_error=raise_on_error,
+ )
+
+ def post_with_auto_paging(
+ self,
+ url: str,
+ json: dict = {},
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ auto_paging_page_size: Optional[int] = None,
+ ):
+ return self._execute_auto_paging_request(
+ method="POST",
+ url=url,
+ query_params=query_params,
+ json=json,
+ stream=stream,
+ headers=headers,
+ raise_on_error=raise_on_error,
+ auto_paging_page_size=auto_paging_page_size,
+ )
+
+ def _execute_auto_paging_request(
+ self,
+ method: Literal["GET", "POST", "PUT", "HEAD"],
+ url: str,
+ json: dict = None,
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ auto_paging_page_size: Optional[int] = None,
+ ) -> Generator:
+ query_params = query_params.copy() if query_params is not None else {}
+ if auto_paging_page_size:
+ query_params["limit"] = auto_paging_page_size
+ page_number = 1
+ has_more = True
+ while has_more:
+ query_params["page_number"] = page_number
+ payload = self._execute_request(
+ method=method,
+ url=url,
+ json=json,
+ query_params=query_params,
+ headers=headers,
+ stream=stream,
+ raise_on_error=raise_on_error,
+ ).json()
+ yield from payload["data"]
+ has_more = payload["has_more"]
+ page_number += 1
+
+ def _execute_request(
+ self,
+ method: Literal["GET", "POST", "PUT", "HEAD"],
+ url: str,
+ json: dict = None,
+ query_params: dict = None,
+ headers: dict = None,
+ stream: bool = False,
+ raise_on_error: bool = True,
+ ):
+ if json is not None:
+ json = self._remove_null_values(json)
+ response = requests.request(
+ method=method,
+ url=url,
+ json=json,
+ params=query_params,
+ headers=headers,
+ auth=BearerAuth(self.api_key),
+ stream=stream,
+ )
if raise_on_error and response.status_code > 299:
- raise Exception(
- f"POST {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
+ raise DeepsetCloudError(
+ f"{method} {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
)
return response
@@ -81,7 +208,7 @@ class IndexClient:
response = self.client.get(url=index_url, headers=headers)
return response.json()
except Exception as ie:
- raise Exception(f"Could not connect to Deepset Cloud:\n{ie}") from ie
+ raise DeepsetCloudError(f"Could not connect to Deepset Cloud:\n{ie}") from ie
def query(
self,
@@ -189,17 +316,26 @@ class PipelineClient:
) -> dict:
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
pipeline_config_url = f"{pipeline_url}/json"
- response = self.client.get(url=pipeline_config_url, headers=headers)
- return response.json()
+ response = self.client.get(url=pipeline_config_url, headers=headers).json()
+ return response
+
+ def list_pipeline_configs(self, workspace: Optional[str] = None, headers: dict = None) -> Generator:
+ workspace_url = self._build_workspace_url(workspace)
+ pipelines_url = f"{workspace_url}/pipelines"
+ generator = self.client.get_with_auto_paging(url=pipelines_url, headers=headers)
+ return generator
def _build_pipeline_url(self, workspace: Optional[str] = None, pipeline_config_name: Optional[str] = None):
- if workspace is None:
- workspace = self.workspace
if pipeline_config_name is None:
pipeline_config_name = self.pipeline_config_name
- workspace_url = self.client.build_workspace_url(workspace)
+ workspace_url = self._build_workspace_url(workspace)
return f"{workspace_url}/pipelines/{pipeline_config_name}"
+ def _build_workspace_url(self, workspace: Optional[str] = None):
+ if workspace is None:
+ workspace = self.workspace
+ return self.client.build_workspace_url(workspace)
+
class DeepsetCloud:
"""
@@ -211,7 +347,7 @@ class DeepsetCloud:
cls,
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
- workspace: Optional[str] = None,
+ workspace: str = "default",
index: Optional[str] = None,
) -> IndexClient:
"""
@@ -233,7 +369,7 @@ class DeepsetCloud:
cls,
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
- workspace: Optional[str] = None,
+ workspace: str = "default",
pipeline_config_name: Optional[str] = None,
) -> PipelineClient:
"""
diff --git a/test/test_document_store.py b/test/test_document_store.py
index 46f8a7478..5eb3b49d6 100644
--- a/test/test_document_store.py
+++ b/test/test_document_store.py
@@ -25,6 +25,7 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.pipelines import DocumentSearchPipeline
+from haystack.utils import DeepsetCloudError
DOCUMENTS = [
@@ -1254,7 +1255,7 @@ def test_DeepsetCloudDocumentStore_invalid_token():
)
with pytest.raises(
- Exception,
+ DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 500 - Internal Server Error",
):
DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key="invalid_token", index=DC_TEST_INDEX)
@@ -1272,7 +1273,7 @@ def test_DeepsetCloudDocumentStore_invalid_api_endpoint():
)
with pytest.raises(
- Exception,
+ DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}00/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 404 - Not Found",
):
DeepsetCloudDocumentStore(api_endpoint=f"{DC_API_ENDPOINT}00", api_key=DC_API_KEY, index=DC_TEST_INDEX)
@@ -1290,7 +1291,7 @@ def test_DeepsetCloudDocumentStore_invalid_index():
)
with pytest.raises(
- Exception,
+ DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/invalid_index failed: HTTP 404 - Not Found",
):
DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY, index="invalid_index")
diff --git a/test/test_pipeline.py b/test/test_pipeline.py
index ebb235510..e128d4558 100644
--- a/test/test_pipeline.py
+++ b/test/test_pipeline.py
@@ -224,6 +224,36 @@ def test_load_from_deepset_cloud_indexing():
indexing_pipeline.run(file_paths=[SAMPLES_PATH / "docs" / "doc_1.txt"])
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_list_pipelines_on_deepset_cloud():
+ if MOCK_DC:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines",
+ json={
+ "data": [
+ {
+ "name": "test_pipeline_config",
+ "pipeline_id": "2184e0c1-c6ec-40a1-9b28-5d2768e5efa2",
+ "status": "DEPLOYED",
+ "created_at": "2022-02-01T09:57:03.803991+00:00",
+ "deleted": False,
+ "is_default": False,
+ "indexing": {"status": "IN_PROGRESS", "pending_file_count": 4, "total_file_count": 33},
+ }
+ ],
+ "has_more": False,
+ "total": 1,
+ },
+ status=200,
+ )
+
+ pipelines = Pipeline.list_pipelines_on_deepset_cloud(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
+ assert len(pipelines) == 1
+ assert pipelines[0]["name"] == "test_pipeline_config"
+
+
# @pytest.mark.slow
# @pytest.mark.elasticsearch
# @pytest.mark.parametrize(