haystack/rest_api/pipeline/__init__.py
Sara Zan 96a538b182
Pylint (import related warnings) and REST API improvements (#2326)
* remove duplicate imports

* fix ungrouped-imports

* Fix wrong-import-position

* Fix unused-import

* pyproject.toml

* Working on wrong-import-order

* Solve wrong-import-order

* fix Pool import

* Move open_search_index_to_document_store and elasticsearch_index_to_document_store in elasticsearch.py

* remove Converter from modeling

* Fix mypy issues on adaptive_model.py

* create es_converter.py

* remove converter import

* change import path in tests

* Restructure REST API to not rely on global vars from search.apy and improve tests

* Fix openapi generator

* Move variable initialization

* Change type of FilterRequest.filters

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2022-04-12 16:41:05 +02:00

65 lines
2.3 KiB
Python

from typing import Any, Dict
import os
import logging
from pathlib import Path
from haystack.pipelines.base import Pipeline
from haystack.document_stores import FAISSDocumentStore, InMemoryDocumentStore
from haystack.errors import PipelineConfigError
from rest_api.controller.utils import RequestLimiter
logger = logging.getLogger(__name__)
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would
# end up with different indices. The same applies for InMemoryDocumentStore.
UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore)
def setup_pipelines() -> Dict[str, Any]:
# Re-import the configuration variables
from rest_api import config # pylint: disable=reimported
pipelines = {}
# Load query pipeline
query_pipeline = Pipeline.load_from_yaml(Path(config.PIPELINE_YAML_PATH), pipeline_name=config.QUERY_PIPELINE_NAME)
logging.info(f"Loaded pipeline nodes: {query_pipeline.graph.nodes.keys()}")
pipelines["query_pipeline"] = query_pipeline
# Find document store
document_store = query_pipeline.get_document_store()
logging.info(f"Loaded docstore: {document_store}")
pipelines["document_store"] = document_store
# Setup concurrency limiter
concurrency_limiter = RequestLimiter(config.CONCURRENT_REQUEST_PER_WORKER)
logging.info("Concurrent requests per worker: {CONCURRENT_REQUEST_PER_WORKER}")
pipelines["concurrency_limiter"] = concurrency_limiter
# Load indexing pipeline (if available)
try:
indexing_pipeline = Pipeline.load_from_yaml(
Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME
)
docstore = indexing_pipeline.get_document_store()
if isinstance(docstore, UNSUPPORTED_DOC_STORES):
indexing_pipeline = None
raise PipelineConfigError(
"Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs."
)
except PipelineConfigError as e:
indexing_pipeline = None
logger.error(f"{e.message}\nFile Upload API will not be available.")
finally:
pipelines["indexing_pipeline"] = indexing_pipeline
# Create directory for uploaded files
os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True)
return pipelines