Upload files to deepset Cloud (#2570)

* added upload_files

* Update Documentation & Code Style

* expose file client via DeepsetCloud facade

* Update Documentation & Code Style

* tests added

* Update Documentation & Code Style

* always read file in binary mode and guess mimetype

* add delete and list functions

* fix method literals

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
tstadel 2022-05-23 17:05:56 +02:00 committed by GitHub
parent 0e83535108
commit 3ab4dac58d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 246 additions and 3 deletions

View File

@ -1,3 +1,6 @@
import json
from mimetypes import guess_type
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple, Union from typing import Any, Dict, Generator, List, Optional, Tuple, Union
try: try:
@ -131,6 +134,7 @@ class DeepsetCloudClient:
query_params: dict = None, query_params: dict = None,
headers: dict = None, headers: dict = None,
stream: bool = False, stream: bool = False,
files: Any = None,
raise_on_error: bool = True, raise_on_error: bool = True,
): ):
return self._execute_request( return self._execute_request(
@ -140,6 +144,7 @@ class DeepsetCloudClient:
json=json, json=json,
data=data, data=data,
stream=stream, stream=stream,
files=files,
headers=headers, headers=headers,
raise_on_error=raise_on_error, raise_on_error=raise_on_error,
) )
@ -211,9 +216,26 @@ class DeepsetCloudClient:
auto_paging_page_size=auto_paging_page_size, auto_paging_page_size=auto_paging_page_size,
) )
def delete(
self,
url: str,
query_params: dict = None,
headers: dict = None,
stream: bool = False,
raise_on_error: bool = True,
):
return self._execute_request(
method="DELETE",
url=url,
query_params=query_params,
headers=headers,
stream=stream,
raise_on_error=raise_on_error,
)
def _execute_auto_paging_request( def _execute_auto_paging_request(
self, self,
method: Literal["GET", "POST", "PUT", "HEAD"], method: Literal["GET", "POST", "PUT", "HEAD", "DELETE"],
url: str, url: str,
json: dict = None, json: dict = None,
data: Any = None, data: Any = None,
@ -246,13 +268,14 @@ class DeepsetCloudClient:
def _execute_request( def _execute_request(
self, self,
method: Literal["GET", "POST", "PUT", "HEAD"], method: Literal["GET", "POST", "PUT", "HEAD", "DELETE"],
url: str, url: str,
json: dict = None, json: dict = None,
data: Any = None, data: Any = None,
query_params: dict = None, query_params: dict = None,
headers: dict = None, headers: dict = None,
stream: bool = False, stream: bool = False,
files: Any = None,
raise_on_error: bool = True, raise_on_error: bool = True,
): ):
if json is not None: if json is not None:
@ -266,6 +289,7 @@ class DeepsetCloudClient:
headers=headers, headers=headers,
auth=BearerAuth(self.api_key), auth=BearerAuth(self.api_key),
stream=stream, stream=stream,
files=files,
) )
if raise_on_error and response.status_code > 299: if raise_on_error and response.status_code > 299:
raise DeepsetCloudError( raise DeepsetCloudError(
@ -789,6 +813,73 @@ class EvaluationSetClient:
return self.client.build_workspace_url(workspace) return self.client.build_workspace_url(workspace)
class FileClient:
def __init__(self, client: DeepsetCloudClient, workspace: Optional[str] = None):
"""
A client to manage files on deepset Cloud.
:param client: deepset Cloud client
:param workspace: workspace in deepset Cloud
"""
self.client = client
self.workspace = workspace
def upload_files(
self,
file_paths: List[Path],
metas: Optional[List[Dict]] = None,
workspace: Optional[str] = None,
headers: dict = None,
):
workspace_url = self._build_workspace_url(workspace)
files_url = f"{workspace_url}/files"
if metas is None:
metas = [{} for _ in file_paths]
file_ids = []
for file_path, meta in zip(file_paths, metas):
try:
mime_type = guess_type(str(file_path))
with open(file_path, "rb") as file:
response_file_upload = self.client.post(
url=files_url,
files={"file": (file_path.name, file, mime_type)},
data={"meta": json.dumps(meta)},
headers=headers,
)
file_id = response_file_upload.json().get("file_id")
file_ids.append(file_id)
except Exception as e:
logger.exception(f"Error uploading file {file_path}")
logger.info(f"Successfully uploaded {len(file_ids)} files.")
def delete_file(self, file_id: str, workspace: Optional[str] = None, headers: dict = None):
workspace_url = self._build_workspace_url(workspace)
file_url = f"{workspace_url}/files/{file_id}"
self.client.delete(url=file_url, headers=headers)
def list_files(
self,
name: Optional[str] = None,
meta_key: Optional[str] = None,
meta_value: Optional[str] = None,
workspace: Optional[str] = None,
headers: dict = None,
) -> Generator:
workspace_url = self._build_workspace_url(workspace)
files_url = f"{workspace_url}/files"
query_params = {"name": name, "meta_key": meta_key, "meta_value": meta_value}
generator = self.client.get_with_auto_paging(url=files_url, headers=headers, query_params=query_params)
return generator
def _build_workspace_url(self, workspace: Optional[str] = None):
if workspace is None:
workspace = self.workspace
return self.client.build_workspace_url(workspace)
class DeepsetCloud: class DeepsetCloud:
""" """
A facade to communicate with deepset Cloud. A facade to communicate with deepset Cloud.
@ -859,3 +950,20 @@ class DeepsetCloud:
""" """
client = DeepsetCloudClient(api_key=api_key, api_endpoint=api_endpoint) client = DeepsetCloudClient(api_key=api_key, api_endpoint=api_endpoint)
return EvaluationSetClient(client=client, workspace=workspace, evaluation_set=evaluation_set) return EvaluationSetClient(client=client, workspace=workspace, evaluation_set=evaluation_set)
@classmethod
def get_file_client(
cls, api_key: Optional[str] = None, api_endpoint: Optional[str] = None, workspace: str = "default"
) -> FileClient:
"""
Creates a client to manage files on deepset Cloud.
:param api_key: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
:param api_endpoint: The URL of the deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
:param workspace: workspace in deepset Cloud
"""
client = DeepsetCloudClient(api_key=api_key, api_endpoint=api_endpoint)
return FileClient(client=client, workspace=workspace)

View File

@ -1,15 +1,19 @@
import logging
import numpy as np import numpy as np
import pytest import pytest
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
import responses
from haystack.utils.deepsetcloud import DeepsetCloud
from haystack.utils.preprocessing import convert_files_to_docs, tika_convert_files_to_docs from haystack.utils.preprocessing import convert_files_to_docs, tika_convert_files_to_docs
from haystack.utils.cleaning import clean_wiki_text from haystack.utils.cleaning import clean_wiki_text
from haystack.utils.augment_squad import augment_squad from haystack.utils.augment_squad import augment_squad
from haystack.utils.squad_data import SquadData from haystack.utils.squad_data import SquadData
from haystack.utils.context_matching import calculate_context_similarity, match_context, match_contexts from haystack.utils.context_matching import calculate_context_similarity, match_context, match_contexts
from ..conftest import SAMPLES_PATH from ..conftest import DC_API_ENDPOINT, DC_API_KEY, MOCK_DC, SAMPLES_PATH, deepset_cloud_fixture
TEST_CONTEXT = context = """Der Merkantilismus förderte Handel und Verkehr mit teils marktkonformen, teils dirigistischen Maßnahmen. TEST_CONTEXT = context = """Der Merkantilismus förderte Handel und Verkehr mit teils marktkonformen, teils dirigistischen Maßnahmen.
An der Schwelle zum 19. Jahrhundert entstand ein neuer Typus des Nationalstaats, der die Säkularisation durchsetzte, An der Schwelle zum 19. Jahrhundert entstand ein neuer Typus des Nationalstaats, der die Säkularisation durchsetzte,
@ -256,3 +260,134 @@ def _insert_noise(input: str, ratio):
for idx, char in zip(insert_idxs, insert_chars): for idx, char in zip(insert_idxs, insert_chars):
input = input[:idx] + char + input[idx:] input = input[:idx] + char + input[idx:]
return input return input
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_upload_file_to_deepset_cloud(caplog):
if MOCK_DC:
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"file_id": "abc"},
status=200,
)
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"file_id": "def"},
status=200,
)
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"file_id": "def"},
status=200,
)
client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
file_paths = [
SAMPLES_PATH / "docx/sample_docx.docx",
SAMPLES_PATH / "pdf/sample_pdf_1.pdf",
SAMPLES_PATH / "docs/doc_1.txt",
]
metas = [{"file_id": "sample_docx.docx"}, {"file_id": "sample_pdf_1.pdf"}, {"file_id": "doc_1.txt"}]
with caplog.at_level(logging.INFO):
client.upload_files(file_paths=file_paths, metas=metas)
assert f"Successfully uploaded {len(file_paths)} files." in caplog.text
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_upload_file_to_deepset_cloud_file_fails(caplog):
if MOCK_DC:
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"file_id": "abc"},
status=200,
)
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"error": "my-error"},
status=500,
)
responses.add(
method=responses.POST,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={"file_id": "def"},
status=200,
)
client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
file_paths = [
SAMPLES_PATH / "docx/sample_docx.docx",
SAMPLES_PATH / "pdf/sample_pdf_1.pdf",
SAMPLES_PATH / "docs/doc_1.txt",
]
metas = [{"file_id": "sample_docx.docx"}, {"file_id": "sample_pdf_1.pdf"}, {"file_id": "doc_1.txt"}]
with caplog.at_level(logging.INFO):
client.upload_files(file_paths=file_paths, metas=metas)
assert f"Successfully uploaded 2 files." in caplog.text
assert f"Error uploading file" in caplog.text
assert f"my-error" in caplog.text
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_delete_file_to_deepset_cloud():
if MOCK_DC:
responses.add(method=responses.DELETE, url=f"{DC_API_ENDPOINT}/workspaces/default/files/abc", status=200)
client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
client.delete_file(file_id="abc")
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_list_files_on_deepset_cloud():
if MOCK_DC:
responses.add(
method=responses.GET,
url=f"{DC_API_ENDPOINT}/workspaces/default/files",
json={
"data": [
{
"characters": -1,
"created_at": "2022-05-19T15:40:07.538162+00:00",
"file_id": "b6cdd48b-3db5-488b-a44d-4240c12a96d5",
"languages": [],
"meta": {},
"name": "sample_pdf_1.pdf",
"params": {"id_hash_keys": ["content", "meta"]},
"size": 44524,
"url": "/api/v1/workspaces/e282219f-19b2-41ff-927e-bda4e6e67418/files/b6cdd48b-3db5-488b-a44d-4240c12a96d5",
},
{
"characters": -1,
"created_at": "2022-05-23T12:39:53.393716+00:00",
"file_id": "51e9c2af-5676-453d-9b71-db9a560ae266",
"languages": [],
"meta": {"file_id": "sample_pdf_2.pdf"},
"name": "sample_pdf_2.pdf",
"params": {"id_hash_keys": ["content", "meta"]},
"size": 26093,
"url": "/api/v1/workspaces/e282219f-19b2-41ff-927e-bda4e6e67418/files/51e9c2af-5676-453d-9b71-db9a560ae266",
},
],
"has_more": False,
"total": 2,
},
status=200,
)
client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY)
files = [f for f in client.list_files()]
assert len(files) == 2
assert files[0]["name"] == "sample_pdf_1.pdf"
assert files[1]["name"] == "sample_pdf_2.pdf"