From 4b46f2047b66c25b88f0abaf82d1915970b8144c Mon Sep 17 00:00:00 2001 From: tstadel <60758086+tstadel@users.noreply.github.com> Date: Mon, 7 Mar 2022 22:35:15 +0100 Subject: [PATCH] save_to_deepset_cloud: automatically convert document stores (#2283) * automatically convert to DeepsetCloudDocumentStore * shorten info text. * fix typo * the -> this * add test * ensure request body has only DeepsetCloudDocumentStores * mark test as elasticsearch to fix milvus1 ci --- haystack/pipelines/base.py | 13 +++++++- test/test_pipeline.py | 63 +++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 39066c64c..92d3072fa 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -373,7 +373,18 @@ class BasePipeline: pipelines = query_config["pipelines"] + index_config["pipelines"] all_components = query_config["components"] + index_config["components"] distinct_components = [c for c in {component["name"]: component for component in all_components}.values()] - config = {"components": distinct_components, "pipelines": pipelines, "version": "0.9"} + document_stores = [c for c in distinct_components if c["type"].endswith("DocumentStore")] + for document_store in document_stores: + if document_store["type"] != "DeepsetCloudDocumentStore": + logger.info( + f"In order to be used on Deepset Cloud, component '{document_store['name']}' of type '{document_store['type']}' " + f"has been automatically converted to type DeepsetCloudDocumentStore. " + f"Usually this replacement will result in equivalent pipeline quality. " + f"However depending on chosen settings of '{document_store['name']}' differences might occur." + ) + document_store["type"] = "DeepsetCloudDocumentStore" + document_store["params"] = {} + config = {"components": distinct_components, "pipelines": pipelines, "version": __version__} client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace) pipeline_config_info = client.get_pipeline_config_info(pipeline_config_name=pipeline_config_name) diff --git a/test/test_pipeline.py b/test/test_pipeline.py index ad3619753..6f938db90 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -2,11 +2,14 @@ from pathlib import Path import os import json +from typing import Tuple from unittest.mock import Mock import pandas as pd import pytest +from requests import PreparedRequest import responses +import yaml from haystack import __version__, Document, Answer, JoinAnswers from haystack.document_stores.base import BaseDocumentStore @@ -19,7 +22,7 @@ from haystack.nodes.retriever.sparse import ElasticsearchRetriever from haystack.pipelines import Pipeline, DocumentSearchPipeline, RootNode, ExtractiveQAPipeline from haystack.pipelines.config import _validate_user_input, validate_config from haystack.pipelines.utils import generate_code -from haystack.nodes import DensePassageRetriever, EmbeddingRetriever, RouteDocuments +from haystack.nodes import DensePassageRetriever, EmbeddingRetriever, RouteDocuments, PreProcessor, TextConverter from conftest import MOCK_DC, DC_API_ENDPOINT, DC_API_KEY, DC_TEST_INDEX, SAMPLES_PATH, deepset_cloud_fixture @@ -746,6 +749,64 @@ def test_save_to_deepset_cloud(): ) +@pytest.mark.elasticsearch +@pytest.mark.usefixtures(deepset_cloud_fixture.__name__) +@responses.activate +def test_save_nonexisting_pipeline_to_deepset_cloud(): + if MOCK_DC: + + def dc_document_store_matcher(request: PreparedRequest) -> Tuple[bool, str]: + matches = False + reason = "No DeepsetCloudDocumentStore found." + request_body = request.body or "" + json_body = yaml.safe_load(request_body) + components = json_body["components"] + for component in components: + if component["type"].endswith("DocumentStore"): + if component["type"] == "DeepsetCloudDocumentStore": + matches = True + else: + matches = False + reason = f"Component {component['name']} is of type {component['type']} and not DeepsetCloudDocumentStore" + break + return matches, reason + + responses.add( + method=responses.GET, + url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline", + json={"errors": ["Pipeline with the name test_pipeline_config_copy does not exists."]}, + status=404, + ) + + responses.add( + method=responses.POST, + url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines", + json={"name": "test_new_non_existing_pipeline"}, + status=201, + match=[dc_document_store_matcher], + ) + + es_document_store = ElasticsearchDocumentStore() + es_retriever = ElasticsearchRetriever(document_store=es_document_store) + file_converter = TextConverter() + preprocessor = PreProcessor() + + query_pipeline = Pipeline() + query_pipeline.add_node(component=es_retriever, name="Retriever", inputs=["Query"]) + index_pipeline = Pipeline() + index_pipeline.add_node(component=file_converter, name="FileConverter", inputs=["File"]) + index_pipeline.add_node(component=preprocessor, name="Preprocessor", inputs=["FileConverter"]) + index_pipeline.add_node(component=es_document_store, name="DocumentStore", inputs=["Preprocessor"]) + + Pipeline.save_to_deepset_cloud( + query_pipeline=query_pipeline, + index_pipeline=index_pipeline, + pipeline_config_name="test_new_non_existing_pipeline", + api_endpoint=DC_API_ENDPOINT, + api_key=DC_API_KEY, + ) + + # @pytest.mark.slow # @pytest.mark.elasticsearch # @pytest.mark.parametrize(