diff --git a/haystack/utils/deepsetcloud.py b/haystack/utils/deepsetcloud.py index 459207ea6..5ff96e81a 100644 --- a/haystack/utils/deepsetcloud.py +++ b/haystack/utils/deepsetcloud.py @@ -1,3 +1,6 @@ +import json +from mimetypes import guess_type +from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple, Union try: @@ -131,6 +134,7 @@ class DeepsetCloudClient: query_params: dict = None, headers: dict = None, stream: bool = False, + files: Any = None, raise_on_error: bool = True, ): return self._execute_request( @@ -140,6 +144,7 @@ class DeepsetCloudClient: json=json, data=data, stream=stream, + files=files, headers=headers, raise_on_error=raise_on_error, ) @@ -211,9 +216,26 @@ class DeepsetCloudClient: auto_paging_page_size=auto_paging_page_size, ) + def delete( + self, + url: str, + query_params: dict = None, + headers: dict = None, + stream: bool = False, + raise_on_error: bool = True, + ): + return self._execute_request( + method="DELETE", + url=url, + query_params=query_params, + headers=headers, + stream=stream, + raise_on_error=raise_on_error, + ) + def _execute_auto_paging_request( self, - method: Literal["GET", "POST", "PUT", "HEAD"], + method: Literal["GET", "POST", "PUT", "HEAD", "DELETE"], url: str, json: dict = None, data: Any = None, @@ -246,13 +268,14 @@ class DeepsetCloudClient: def _execute_request( self, - method: Literal["GET", "POST", "PUT", "HEAD"], + method: Literal["GET", "POST", "PUT", "HEAD", "DELETE"], url: str, json: dict = None, data: Any = None, query_params: dict = None, headers: dict = None, stream: bool = False, + files: Any = None, raise_on_error: bool = True, ): if json is not None: @@ -266,6 +289,7 @@ class DeepsetCloudClient: headers=headers, auth=BearerAuth(self.api_key), stream=stream, + files=files, ) if raise_on_error and response.status_code > 299: raise DeepsetCloudError( @@ -789,6 +813,73 @@ class EvaluationSetClient: return self.client.build_workspace_url(workspace) +class FileClient: + def __init__(self, client: DeepsetCloudClient, workspace: Optional[str] = None): + """ + A client to manage files on deepset Cloud. + + :param client: deepset Cloud client + :param workspace: workspace in deepset Cloud + + """ + self.client = client + self.workspace = workspace + + def upload_files( + self, + file_paths: List[Path], + metas: Optional[List[Dict]] = None, + workspace: Optional[str] = None, + headers: dict = None, + ): + workspace_url = self._build_workspace_url(workspace) + files_url = f"{workspace_url}/files" + if metas is None: + metas = [{} for _ in file_paths] + + file_ids = [] + for file_path, meta in zip(file_paths, metas): + try: + mime_type = guess_type(str(file_path)) + with open(file_path, "rb") as file: + response_file_upload = self.client.post( + url=files_url, + files={"file": (file_path.name, file, mime_type)}, + data={"meta": json.dumps(meta)}, + headers=headers, + ) + file_id = response_file_upload.json().get("file_id") + file_ids.append(file_id) + except Exception as e: + logger.exception(f"Error uploading file {file_path}") + + logger.info(f"Successfully uploaded {len(file_ids)} files.") + + def delete_file(self, file_id: str, workspace: Optional[str] = None, headers: dict = None): + workspace_url = self._build_workspace_url(workspace) + file_url = f"{workspace_url}/files/{file_id}" + self.client.delete(url=file_url, headers=headers) + + def list_files( + self, + name: Optional[str] = None, + meta_key: Optional[str] = None, + meta_value: Optional[str] = None, + workspace: Optional[str] = None, + headers: dict = None, + ) -> Generator: + workspace_url = self._build_workspace_url(workspace) + files_url = f"{workspace_url}/files" + query_params = {"name": name, "meta_key": meta_key, "meta_value": meta_value} + generator = self.client.get_with_auto_paging(url=files_url, headers=headers, query_params=query_params) + return generator + + def _build_workspace_url(self, workspace: Optional[str] = None): + if workspace is None: + workspace = self.workspace + return self.client.build_workspace_url(workspace) + + class DeepsetCloud: """ A facade to communicate with deepset Cloud. @@ -859,3 +950,20 @@ class DeepsetCloud: """ client = DeepsetCloudClient(api_key=api_key, api_endpoint=api_endpoint) return EvaluationSetClient(client=client, workspace=workspace, evaluation_set=evaluation_set) + + @classmethod + def get_file_client( + cls, api_key: Optional[str] = None, api_endpoint: Optional[str] = None, workspace: str = "default" + ) -> FileClient: + """ + Creates a client to manage files on deepset Cloud. + + :param api_key: Secret value of the API key. + If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable. + :param api_endpoint: The URL of the deepset Cloud API. + If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable. + :param workspace: workspace in deepset Cloud + + """ + client = DeepsetCloudClient(api_key=api_key, api_endpoint=api_endpoint) + return FileClient(client=client, workspace=workspace) diff --git a/test/others/test_utils.py b/test/others/test_utils.py index 217599da3..9b37dd608 100644 --- a/test/others/test_utils.py +++ b/test/others/test_utils.py @@ -1,15 +1,19 @@ +import logging import numpy as np import pytest import pandas as pd from pathlib import Path +import responses +from haystack.utils.deepsetcloud import DeepsetCloud + from haystack.utils.preprocessing import convert_files_to_docs, tika_convert_files_to_docs from haystack.utils.cleaning import clean_wiki_text from haystack.utils.augment_squad import augment_squad from haystack.utils.squad_data import SquadData from haystack.utils.context_matching import calculate_context_similarity, match_context, match_contexts -from ..conftest import SAMPLES_PATH +from ..conftest import DC_API_ENDPOINT, DC_API_KEY, MOCK_DC, SAMPLES_PATH, deepset_cloud_fixture TEST_CONTEXT = context = """Der Merkantilismus förderte Handel und Verkehr mit teils marktkonformen, teils dirigistischen Maßnahmen. An der Schwelle zum 19. Jahrhundert entstand ein neuer Typus des Nationalstaats, der die Säkularisation durchsetzte, @@ -256,3 +260,134 @@ def _insert_noise(input: str, ratio): for idx, char in zip(insert_idxs, insert_chars): input = input[:idx] + char + input[idx:] return input + + +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_upload_file_to_deepset_cloud(caplog): + if MOCK_DC: + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"file_id": "abc"}, + status=200, + ) + + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"file_id": "def"}, + status=200, + ) + + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"file_id": "def"}, + status=200, + ) + + client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY) + file_paths = [ + SAMPLES_PATH / "docx/sample_docx.docx", + SAMPLES_PATH / "pdf/sample_pdf_1.pdf", + SAMPLES_PATH / "docs/doc_1.txt", + ] + metas = [{"file_id": "sample_docx.docx"}, {"file_id": "sample_pdf_1.pdf"}, {"file_id": "doc_1.txt"}] + with caplog.at_level(logging.INFO): + client.upload_files(file_paths=file_paths, metas=metas) + assert f"Successfully uploaded {len(file_paths)} files." in caplog.text + + +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_upload_file_to_deepset_cloud_file_fails(caplog): + if MOCK_DC: + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"file_id": "abc"}, + status=200, + ) + + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"error": "my-error"}, + status=500, + ) + + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={"file_id": "def"}, + status=200, + ) + + client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY) + file_paths = [ + SAMPLES_PATH / "docx/sample_docx.docx", + SAMPLES_PATH / "pdf/sample_pdf_1.pdf", + SAMPLES_PATH / "docs/doc_1.txt", + ] + metas = [{"file_id": "sample_docx.docx"}, {"file_id": "sample_pdf_1.pdf"}, {"file_id": "doc_1.txt"}] + with caplog.at_level(logging.INFO): + client.upload_files(file_paths=file_paths, metas=metas) + assert f"Successfully uploaded 2 files." in caplog.text + assert f"Error uploading file" in caplog.text + assert f"my-error" in caplog.text + + +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_delete_file_to_deepset_cloud(): + if MOCK_DC: + responses.add(method=responses.DELETE, url=f"{DC_API_ENDPOINT}/workspaces/default/files/abc", status=200) + + client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY) + client.delete_file(file_id="abc") + + +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_list_files_on_deepset_cloud(): + if MOCK_DC: + responses.add( + method=responses.GET, + url=f"{DC_API_ENDPOINT}/workspaces/default/files", + json={ + "data": [ + { + "characters": -1, + "created_at": "2022-05-19T15:40:07.538162+00:00", + "file_id": "b6cdd48b-3db5-488b-a44d-4240c12a96d5", + "languages": [], + "meta": {}, + "name": "sample_pdf_1.pdf", + "params": {"id_hash_keys": ["content", "meta"]}, + "size": 44524, + "url": "/api/v1/workspaces/e282219f-19b2-41ff-927e-bda4e6e67418/files/b6cdd48b-3db5-488b-a44d-4240c12a96d5", + }, + { + "characters": -1, + "created_at": "2022-05-23T12:39:53.393716+00:00", + "file_id": "51e9c2af-5676-453d-9b71-db9a560ae266", + "languages": [], + "meta": {"file_id": "sample_pdf_2.pdf"}, + "name": "sample_pdf_2.pdf", + "params": {"id_hash_keys": ["content", "meta"]}, + "size": 26093, + "url": "/api/v1/workspaces/e282219f-19b2-41ff-927e-bda4e6e67418/files/51e9c2af-5676-453d-9b71-db9a560ae266", + }, + ], + "has_more": False, + "total": 2, + }, + status=200, + ) + + client = DeepsetCloud.get_file_client(api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY) + files = [f for f in client.list_files()] + assert len(files) == 2 + assert files[0]["name"] == "sample_pdf_1.pdf" + assert files[1]["name"] == "sample_pdf_2.pdf"