From dad7b12874c24e36e977efae066a4969a042094b Mon Sep 17 00:00:00 2001 From: Mayank Jobanputra Date: Thu, 19 Jan 2023 14:06:00 +0530 Subject: [PATCH] fix: Allowing InMemStore and FAISSDocStore for indexing using single worker (#3868) * Allowing InMemStore and FAISSDocStore for indexing using single worker YAML config * unified pipeline & doc store loading * fix pylint warning * separated tests * removed unnecessay caplog --- rest_api/rest_api/pipeline/__init__.py | 61 ++++++++++--------- rest_api/test/samples/test.bogus_pipeline.yml | 17 ++++++ .../test.in-memory-haystack-pipeline.yml | 16 +++++ rest_api/test/test_rest_api.py | 28 ++++++++- 4 files changed, 92 insertions(+), 30 deletions(-) create mode 100644 rest_api/test/samples/test.bogus_pipeline.yml create mode 100644 rest_api/test/samples/test.in-memory-haystack-pipeline.yml diff --git a/rest_api/rest_api/pipeline/__init__.py b/rest_api/rest_api/pipeline/__init__.py index b99805e23..ae400d4e3 100644 --- a/rest_api/rest_api/pipeline/__init__.py +++ b/rest_api/rest_api/pipeline/__init__.py @@ -13,9 +13,32 @@ from rest_api.controller.utils import RequestLimiter logger = logging.getLogger(__name__) -# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would -# end up with different indices. The same applies for InMemoryDocumentStore. -UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore) +# Each instance of FAISSDocumentStore creates an in-memory FAISS index, +# the Indexing & Query Pipelines will end up with different indices for each worker. +# The same applies for InMemoryDocumentStore. +SINGLE_PROCESS_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore) + + +def _load_pipeline(pipeline_yaml_path, indexing_pipeline_name): + # Load pipeline (if available) + try: + pipeline = Pipeline.load_from_yaml(Path(pipeline_yaml_path), pipeline_name=indexing_pipeline_name) + logging.info("Loaded pipeline nodes: %s", pipeline.graph.nodes.keys()) + document_store = _get_pipeline_doc_store(pipeline) + except PipelineConfigError as e: + pipeline, document_store = None, None + logger.error( + "Error loading %s pipeline from %s. \n %s\n", indexing_pipeline_name, pipeline_yaml_path, e.message + ) + return pipeline, document_store + + +def _get_pipeline_doc_store(pipeline): + document_store = pipeline.get_document_store() + logging.info("Loaded docstore: %s", document_store) + if isinstance(document_store, SINGLE_PROCESS_DOC_STORES): + logger.warning("FAISSDocumentStore or InMemoryDocumentStore should only be used with 1 worker.") + return document_store def setup_pipelines() -> Dict[str, Any]: @@ -24,14 +47,9 @@ def setup_pipelines() -> Dict[str, Any]: pipelines = {} - # Load query pipeline - query_pipeline = Pipeline.load_from_yaml(Path(config.PIPELINE_YAML_PATH), pipeline_name=config.QUERY_PIPELINE_NAME) - logging.info("Loaded pipeline nodes: %s", query_pipeline.graph.nodes.keys()) + # Load query pipeline & document store + query_pipeline, document_store = _load_pipeline(config.PIPELINE_YAML_PATH, config.QUERY_PIPELINE_NAME) pipelines["query_pipeline"] = query_pipeline - - # Find document store - document_store = query_pipeline.get_document_store() - logging.info("Loaded docstore: %s", document_store) pipelines["document_store"] = document_store # Setup concurrency limiter @@ -39,24 +57,11 @@ def setup_pipelines() -> Dict[str, Any]: logging.info("Concurrent requests per worker: %s", config.CONCURRENT_REQUEST_PER_WORKER) pipelines["concurrency_limiter"] = concurrency_limiter - # Load indexing pipeline (if available) - try: - indexing_pipeline = Pipeline.load_from_yaml( - Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME - ) - docstore = indexing_pipeline.get_document_store() - if isinstance(docstore, UNSUPPORTED_DOC_STORES): - indexing_pipeline = None - raise PipelineConfigError( - "Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs." - ) - - except PipelineConfigError as e: - indexing_pipeline = None - logger.error("%s\nFile Upload API will not be available.", e.message) - - finally: - pipelines["indexing_pipeline"] = indexing_pipeline + # Load indexing pipeline + index_pipeline, _ = _load_pipeline(config.PIPELINE_YAML_PATH, config.INDEXING_PIPELINE_NAME) + if not index_pipeline: + logger.warning("Indexing Pipeline is not setup. File Upload API will not be available.") + pipelines["indexing_pipeline"] = index_pipeline # Create directory for uploaded files os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True) diff --git a/rest_api/test/samples/test.bogus_pipeline.yml b/rest_api/test/samples/test.bogus_pipeline.yml new file mode 100644 index 000000000..22120c975 --- /dev/null +++ b/rest_api/test/samples/test.bogus_pipeline.yml @@ -0,0 +1,17 @@ +version: 'ignore' + +components: + - name: InMemoryDocumentStore + params: {} + # Wrong DocumentStore Type + type: MyOwnDocumentStore + - name: tfidf_ret + params: + document_store: InMemoryDocumentStore + type: TfidfRetriever +pipelines: + - name: query + nodes: + - inputs: + - Query + name: tfidf_ret diff --git a/rest_api/test/samples/test.in-memory-haystack-pipeline.yml b/rest_api/test/samples/test.in-memory-haystack-pipeline.yml new file mode 100644 index 000000000..c6c9a5d29 --- /dev/null +++ b/rest_api/test/samples/test.in-memory-haystack-pipeline.yml @@ -0,0 +1,16 @@ +version: 'ignore' + +components: + - name: InMemoryDocumentStore + params: {} + type: InMemoryDocumentStore + - name: tfidf_ret + params: + document_store: InMemoryDocumentStore + type: TfidfRetriever +pipelines: + - name: query + nodes: + - inputs: + - Query + name: tfidf_ret diff --git a/rest_api/test/test_rest_api.py b/rest_api/test/test_rest_api.py index 832cbc5ec..f3ff50b6c 100644 --- a/rest_api/test/test_rest_api.py +++ b/rest_api/test/test_rest_api.py @@ -5,25 +5,49 @@ from pathlib import Path from textwrap import dedent from unittest import mock from unittest.mock import MagicMock, Mock -import functools import numpy as np import pandas as pd import pytest from fastapi.testclient import TestClient -from haystack import Document, Answer +from haystack import Document, Answer, Pipeline import haystack from haystack.nodes import BaseReader, BaseRetriever from haystack.document_stores import BaseDocumentStore +from haystack.errors import PipelineSchemaError from haystack.schema import Label, FilterType from haystack.nodes.file_converter import BaseConverter +from rest_api.pipeline import _load_pipeline from rest_api.utils import get_app TEST_QUERY = "Who made the PDF specification?" +def test_single_worker_warning_for_indexing_pipelines(caplog): + yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.in-memory-haystack-pipeline.yml" + p, _ = _load_pipeline(yaml_pipeline_path, None) + + assert isinstance(p, Pipeline) + assert "used with 1 worker" in caplog.text + + +def test_check_error_for_pipeline_not_found(): + + yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.in-memory-haystack-pipeline.yml" + p, _ = _load_pipeline(yaml_pipeline_path, "ThisPipelineDoesntExist") + assert p is None + + +def test_bad_yaml_pipeline_configuration_error(): + + yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.bogus_pipeline.yml" + with pytest.raises(PipelineSchemaError) as excinfo: + _load_pipeline(yaml_pipeline_path, None) + assert "MyOwnDocumentStore" in str(excinfo.value) + + class MockReader(BaseReader): outgoing_edges = 1