List all pipeline(_configs) on Deepset Cloud (#2102)

* add list_pipelines_on_deepset_cloud()

* Apply Black

* refactor auto paging and throw DeepsetCloudErrors

* Apply Black

* fix mypy findings

* Update documentation

* Fix merge error on pipelines.md

* Update Documentation & Code Style

Co-authored-by: Sara Zan <sara.zanzottera@deepset.ai>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
tstadel 2022-02-08 20:35:25 +01:00 committed by GitHub
parent ffbba90323
commit 1e3edef803
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 276 additions and 28 deletions

View File

@ -82,7 +82,7 @@ variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
```python
@classmethod
def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: Optional[str] = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False)
def load_from_deepset_cloud(cls, pipeline_config_name: str, pipeline_name: str = "query", workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, overwrite_with_env_variables: bool = False)
```
Load Pipeline from Deepset Cloud defining the individual components and how they're tied together to form
@ -90,9 +90,13 @@ Load Pipeline from Deepset Cloud defining the individual components and how they
a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.
In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
Use the returned `name` as `pipeline_config_name`.
**Arguments**:
- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
- `pipeline_name`: specifies which pipeline to load from config.
Deepset Cloud typically provides a 'query' and a 'index' pipeline per config.
- `workspace`: workspace in Deepset Cloud
@ -105,6 +109,42 @@ to change return_no_answer param for a FARMReader, an env
variable 'READER_PARAMS_RETURN_NO_ANSWER=False' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
<a id="base.BasePipeline.list_pipelines_on_deepset_cloud"></a>
#### list\_pipelines\_on\_deepset\_cloud
```python
@classmethod
def list_pipelines_on_deepset_cloud(cls, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None) -> List[dict]
```
Lists all pipeline configs available on Deepset Cloud.
**Arguments**:
- `workspace`: workspace in Deepset Cloud
- `api_key`: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
- `api_endpoint`: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
Returns:
list of dictionaries: List[dict]
each dictionary: {
"name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`,
"..." -> additional pipeline meta information
}
example:
[{'name': 'my_super_nice_pipeline_config',
'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2',
'status': 'DEPLOYED',
'created_at': '2022-02-01T09:57:03.803991+00:00',
'deleted': False,
'is_default': False,
'indexing': {'status': 'IN_PROGRESS',
'pending_file_count': 3,
'total_file_count': 31}}]
<a id="base.Pipeline"></a>
## Pipeline

View File

@ -126,7 +126,7 @@ class BasePipeline:
cls,
pipeline_config_name: str,
pipeline_name: str = "query",
workspace: Optional[str] = "default",
workspace: str = "default",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
overwrite_with_env_variables: bool = False,
@ -136,7 +136,11 @@ class BasePipeline:
a Pipeline. A single config can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.
In order to get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
Use the returned `name` as `pipeline_config_name`.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
To get a list of all available pipeline_config_names, call `list_pipelines_on_deepset_cloud()`.
:param pipeline_name: specifies which pipeline to load from config.
Deepset Cloud typically provides a 'query' and a 'index' pipeline per config.
:param workspace: workspace in Deepset Cloud
@ -175,6 +179,43 @@ class BasePipeline:
)
return pipeline
@classmethod
def list_pipelines_on_deepset_cloud(
cls,
workspace: str = "default",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
) -> List[dict]:
"""
Lists all pipeline configs available on Deepset Cloud.
:param workspace: workspace in Deepset Cloud
:param api_key: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
:param api_endpoint: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
Returns:
list of dictionaries: List[dict]
each dictionary: {
"name": str -> `pipeline_config_name` to be used in `load_from_deepset_cloud()`,
"..." -> additional pipeline meta information
}
example:
[{'name': 'my_super_nice_pipeline_config',
'pipeline_id': '2184e0c1-c6ec-40a1-9b28-5d2768e5efa2',
'status': 'DEPLOYED',
'created_at': '2022-02-01T09:57:03.803991+00:00',
'deleted': False,
'is_default': False,
'indexing': {'status': 'IN_PROGRESS',
'pending_file_count': 3,
'total_file_count': 31}}]
"""
client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
pipeline_config_infos = list(client.list_pipeline_configs())
return pipeline_config_infos
@classmethod
def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None):
"""

View File

@ -10,7 +10,7 @@ from haystack.utils.doc_store import (
stop_opensearch,
stop_service,
)
from haystack.utils.deepsetcloud import DeepsetCloud
from haystack.utils.deepsetcloud import DeepsetCloud, DeepsetCloudError
from haystack.utils.export_utils import (
print_answers,
print_documents,

View File

@ -1,6 +1,12 @@
import logging
import os
from typing import Dict, List, Optional
from typing import Dict, Generator, List, Optional
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore
import requests
DEFAULT_API_ENDPOINT = f"DC_API_PLACEHOLDER/v1" # TODO
@ -17,6 +23,10 @@ class BearerAuth(requests.auth.AuthBase):
return r
class DeepsetCloudError(Exception):
"""Raised when there is an error communicating with Deepset Cloud"""
class DeepsetCloudClient:
def __init__(self, api_key: str = None, api_endpoint: Optional[str] = None):
"""
@ -29,26 +39,143 @@ class DeepsetCloudClient:
"""
self.api_key = api_key or os.getenv("DEEPSET_CLOUD_API_KEY")
if self.api_key is None:
raise ValueError(
raise DeepsetCloudError(
"No api_key specified. Please set api_key param or DEEPSET_CLOUD_API_KEY environment variable."
)
self.api_endpoint = api_endpoint or os.getenv("DEEPSET_CLOUD_API_ENDPOINT", DEFAULT_API_ENDPOINT)
def get(self, url: str, headers: dict = None, query_params: dict = None, raise_on_error: bool = True):
response = requests.get(url=url, auth=BearerAuth(self.api_key), headers=headers, params=query_params)
if raise_on_error and response.status_code > 299:
raise Exception(
f"GET {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
)
return response
def get(
self,
url: str,
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
):
return self._execute_request(
method="GET",
url=url,
query_params=query_params,
headers=headers,
stream=stream,
raise_on_error=raise_on_error,
)
def post(self, url: str, json: dict = {}, stream: bool = False, headers: dict = None, raise_on_error: bool = True):
json = self._remove_null_values(json)
response = requests.post(url=url, json=json, stream=stream, headers=headers, auth=BearerAuth(self.api_key))
def get_with_auto_paging(
self,
url: str,
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
auto_paging_page_size: Optional[int] = None,
) -> Generator:
return self._execute_auto_paging_request(
method="GET",
url=url,
query_params=query_params,
headers=headers,
stream=stream,
raise_on_error=raise_on_error,
auto_paging_page_size=auto_paging_page_size,
)
def post(
self,
url: str,
json: dict = {},
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
):
return self._execute_request(
method="POST",
url=url,
query_params=query_params,
json=json,
stream=stream,
headers=headers,
raise_on_error=raise_on_error,
)
def post_with_auto_paging(
self,
url: str,
json: dict = {},
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
auto_paging_page_size: Optional[int] = None,
):
return self._execute_auto_paging_request(
method="POST",
url=url,
query_params=query_params,
json=json,
stream=stream,
headers=headers,
raise_on_error=raise_on_error,
auto_paging_page_size=auto_paging_page_size,
)
def _execute_auto_paging_request(
self,
method: Literal["GET", "POST", "PUT", "HEAD"],
url: str,
json: dict = None,
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
auto_paging_page_size: Optional[int] = None,
) -> Generator:
query_params = query_params.copy() if query_params is not None else {}
if auto_paging_page_size:
query_params["limit"] = auto_paging_page_size
page_number = 1
has_more = True
while has_more:
query_params["page_number"] = page_number
payload = self._execute_request(
method=method,
url=url,
json=json,
query_params=query_params,
headers=headers,
stream=stream,
raise_on_error=raise_on_error,
).json()
yield from payload["data"]
has_more = payload["has_more"]
page_number += 1
def _execute_request(
self,
method: Literal["GET", "POST", "PUT", "HEAD"],
url: str,
json: dict = None,
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
):
if json is not None:
json = self._remove_null_values(json)
response = requests.request(
method=method,
url=url,
json=json,
params=query_params,
headers=headers,
auth=BearerAuth(self.api_key),
stream=stream,
)
if raise_on_error and response.status_code > 299:
raise Exception(
f"POST {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
raise DeepsetCloudError(
f"{method} {url} failed: HTTP {response.status_code} - {response.reason}\n{response.content.decode()}"
)
return response
@ -81,7 +208,7 @@ class IndexClient:
response = self.client.get(url=index_url, headers=headers)
return response.json()
except Exception as ie:
raise Exception(f"Could not connect to Deepset Cloud:\n{ie}") from ie
raise DeepsetCloudError(f"Could not connect to Deepset Cloud:\n{ie}") from ie
def query(
self,
@ -189,17 +316,26 @@ class PipelineClient:
) -> dict:
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
pipeline_config_url = f"{pipeline_url}/json"
response = self.client.get(url=pipeline_config_url, headers=headers)
return response.json()
response = self.client.get(url=pipeline_config_url, headers=headers).json()
return response
def list_pipeline_configs(self, workspace: Optional[str] = None, headers: dict = None) -> Generator:
workspace_url = self._build_workspace_url(workspace)
pipelines_url = f"{workspace_url}/pipelines"
generator = self.client.get_with_auto_paging(url=pipelines_url, headers=headers)
return generator
def _build_pipeline_url(self, workspace: Optional[str] = None, pipeline_config_name: Optional[str] = None):
if workspace is None:
workspace = self.workspace
if pipeline_config_name is None:
pipeline_config_name = self.pipeline_config_name
workspace_url = self.client.build_workspace_url(workspace)
workspace_url = self._build_workspace_url(workspace)
return f"{workspace_url}/pipelines/{pipeline_config_name}"
def _build_workspace_url(self, workspace: Optional[str] = None):
if workspace is None:
workspace = self.workspace
return self.client.build_workspace_url(workspace)
class DeepsetCloud:
"""
@ -211,7 +347,7 @@ class DeepsetCloud:
cls,
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
workspace: Optional[str] = None,
workspace: str = "default",
index: Optional[str] = None,
) -> IndexClient:
"""
@ -233,7 +369,7 @@ class DeepsetCloud:
cls,
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
workspace: Optional[str] = None,
workspace: str = "default",
pipeline_config_name: Optional[str] = None,
) -> PipelineClient:
"""

View File

@ -25,6 +25,7 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.pipelines import DocumentSearchPipeline
from haystack.utils import DeepsetCloudError
DOCUMENTS = [
@ -1254,7 +1255,7 @@ def test_DeepsetCloudDocumentStore_invalid_token():
)
with pytest.raises(
Exception,
DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 500 - Internal Server Error",
):
DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key="invalid_token", index=DC_TEST_INDEX)
@ -1272,7 +1273,7 @@ def test_DeepsetCloudDocumentStore_invalid_api_endpoint():
)
with pytest.raises(
Exception,
DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}00/workspaces/default/indexes/{DC_TEST_INDEX} failed: HTTP 404 - Not Found",
):
DeepsetCloudDocumentStore(api_endpoint=f"{DC_API_ENDPOINT}00", api_key=DC_API_KEY, index=DC_TEST_INDEX)
@ -1290,7 +1291,7 @@ def test_DeepsetCloudDocumentStore_invalid_index():
)
with pytest.raises(
Exception,
DeepsetCloudError,
match=f"Could not connect to Deepset Cloud:\nGET {DC_API_ENDPOINT}/workspaces/default/indexes/invalid_index failed: HTTP 404 - Not Found",
):
DeepsetCloudDocumentStore(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY, index="invalid_index")

View File

@ -224,6 +224,36 @@ def test_load_from_deepset_cloud_indexing():
indexing_pipeline.run(file_paths=[SAMPLES_PATH / "docs" / "doc_1.txt"])
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_list_pipelines_on_deepset_cloud():
if MOCK_DC:
responses.add(
method=responses.GET,
url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines",
json={
"data": [
{
"name": "test_pipeline_config",
"pipeline_id": "2184e0c1-c6ec-40a1-9b28-5d2768e5efa2",
"status": "DEPLOYED",
"created_at": "2022-02-01T09:57:03.803991+00:00",
"deleted": False,
"is_default": False,
"indexing": {"status": "IN_PROGRESS", "pending_file_count": 4, "total_file_count": 33},
}
],
"has_more": False,
"total": 1,
},
status=200,
)
pipelines = Pipeline.list_pipelines_on_deepset_cloud(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
assert len(pipelines) == 1
assert pipelines[0]["name"] == "test_pipeline_config"
# @pytest.mark.slow
# @pytest.mark.elasticsearch
# @pytest.mark.parametrize(