fix: Allowing InMemStore and FAISSDocStore for indexing using single worker (#3868)

* Allowing InMemStore and FAISSDocStore for indexing using single worker YAML config

* unified pipeline & doc store loading

* fix pylint warning

* separated tests

* removed unnecessay caplog
This commit is contained in:
Mayank Jobanputra 2023-01-19 14:06:00 +05:30 committed by GitHub
parent 12e057837b
commit dad7b12874
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 30 deletions

View File

@ -13,9 +13,32 @@ from rest_api.controller.utils import RequestLimiter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would # Each instance of FAISSDocumentStore creates an in-memory FAISS index,
# end up with different indices. The same applies for InMemoryDocumentStore. # the Indexing & Query Pipelines will end up with different indices for each worker.
UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore) # The same applies for InMemoryDocumentStore.
SINGLE_PROCESS_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore)
def _load_pipeline(pipeline_yaml_path, indexing_pipeline_name):
# Load pipeline (if available)
try:
pipeline = Pipeline.load_from_yaml(Path(pipeline_yaml_path), pipeline_name=indexing_pipeline_name)
logging.info("Loaded pipeline nodes: %s", pipeline.graph.nodes.keys())
document_store = _get_pipeline_doc_store(pipeline)
except PipelineConfigError as e:
pipeline, document_store = None, None
logger.error(
"Error loading %s pipeline from %s. \n %s\n", indexing_pipeline_name, pipeline_yaml_path, e.message
)
return pipeline, document_store
def _get_pipeline_doc_store(pipeline):
document_store = pipeline.get_document_store()
logging.info("Loaded docstore: %s", document_store)
if isinstance(document_store, SINGLE_PROCESS_DOC_STORES):
logger.warning("FAISSDocumentStore or InMemoryDocumentStore should only be used with 1 worker.")
return document_store
def setup_pipelines() -> Dict[str, Any]: def setup_pipelines() -> Dict[str, Any]:
@ -24,14 +47,9 @@ def setup_pipelines() -> Dict[str, Any]:
pipelines = {} pipelines = {}
# Load query pipeline # Load query pipeline & document store
query_pipeline = Pipeline.load_from_yaml(Path(config.PIPELINE_YAML_PATH), pipeline_name=config.QUERY_PIPELINE_NAME) query_pipeline, document_store = _load_pipeline(config.PIPELINE_YAML_PATH, config.QUERY_PIPELINE_NAME)
logging.info("Loaded pipeline nodes: %s", query_pipeline.graph.nodes.keys())
pipelines["query_pipeline"] = query_pipeline pipelines["query_pipeline"] = query_pipeline
# Find document store
document_store = query_pipeline.get_document_store()
logging.info("Loaded docstore: %s", document_store)
pipelines["document_store"] = document_store pipelines["document_store"] = document_store
# Setup concurrency limiter # Setup concurrency limiter
@ -39,24 +57,11 @@ def setup_pipelines() -> Dict[str, Any]:
logging.info("Concurrent requests per worker: %s", config.CONCURRENT_REQUEST_PER_WORKER) logging.info("Concurrent requests per worker: %s", config.CONCURRENT_REQUEST_PER_WORKER)
pipelines["concurrency_limiter"] = concurrency_limiter pipelines["concurrency_limiter"] = concurrency_limiter
# Load indexing pipeline (if available) # Load indexing pipeline
try: index_pipeline, _ = _load_pipeline(config.PIPELINE_YAML_PATH, config.INDEXING_PIPELINE_NAME)
indexing_pipeline = Pipeline.load_from_yaml( if not index_pipeline:
Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME logger.warning("Indexing Pipeline is not setup. File Upload API will not be available.")
) pipelines["indexing_pipeline"] = index_pipeline
docstore = indexing_pipeline.get_document_store()
if isinstance(docstore, UNSUPPORTED_DOC_STORES):
indexing_pipeline = None
raise PipelineConfigError(
"Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs."
)
except PipelineConfigError as e:
indexing_pipeline = None
logger.error("%s\nFile Upload API will not be available.", e.message)
finally:
pipelines["indexing_pipeline"] = indexing_pipeline
# Create directory for uploaded files # Create directory for uploaded files
os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True) os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True)

View File

@ -0,0 +1,17 @@
version: 'ignore'
components:
- name: InMemoryDocumentStore
params: {}
# Wrong DocumentStore Type
type: MyOwnDocumentStore
- name: tfidf_ret
params:
document_store: InMemoryDocumentStore
type: TfidfRetriever
pipelines:
- name: query
nodes:
- inputs:
- Query
name: tfidf_ret

View File

@ -0,0 +1,16 @@
version: 'ignore'
components:
- name: InMemoryDocumentStore
params: {}
type: InMemoryDocumentStore
- name: tfidf_ret
params:
document_store: InMemoryDocumentStore
type: TfidfRetriever
pipelines:
- name: query
nodes:
- inputs:
- Query
name: tfidf_ret

View File

@ -5,25 +5,49 @@ from pathlib import Path
from textwrap import dedent from textwrap import dedent
from unittest import mock from unittest import mock
from unittest.mock import MagicMock, Mock from unittest.mock import MagicMock, Mock
import functools
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from haystack import Document, Answer from haystack import Document, Answer, Pipeline
import haystack import haystack
from haystack.nodes import BaseReader, BaseRetriever from haystack.nodes import BaseReader, BaseRetriever
from haystack.document_stores import BaseDocumentStore from haystack.document_stores import BaseDocumentStore
from haystack.errors import PipelineSchemaError
from haystack.schema import Label, FilterType from haystack.schema import Label, FilterType
from haystack.nodes.file_converter import BaseConverter from haystack.nodes.file_converter import BaseConverter
from rest_api.pipeline import _load_pipeline
from rest_api.utils import get_app from rest_api.utils import get_app
TEST_QUERY = "Who made the PDF specification?" TEST_QUERY = "Who made the PDF specification?"
def test_single_worker_warning_for_indexing_pipelines(caplog):
yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.in-memory-haystack-pipeline.yml"
p, _ = _load_pipeline(yaml_pipeline_path, None)
assert isinstance(p, Pipeline)
assert "used with 1 worker" in caplog.text
def test_check_error_for_pipeline_not_found():
yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.in-memory-haystack-pipeline.yml"
p, _ = _load_pipeline(yaml_pipeline_path, "ThisPipelineDoesntExist")
assert p is None
def test_bad_yaml_pipeline_configuration_error():
yaml_pipeline_path = Path(__file__).parent.resolve() / "samples" / "test.bogus_pipeline.yml"
with pytest.raises(PipelineSchemaError) as excinfo:
_load_pipeline(yaml_pipeline_path, None)
assert "MyOwnDocumentStore" in str(excinfo.value)
class MockReader(BaseReader): class MockReader(BaseReader):
outgoing_edges = 1 outgoing_edges = 1