diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0fdc4f7df..5e13f4aa5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -289,6 +289,44 @@ jobs: channel: '#haystack' if: failure() && github.repository_owner == 'deepset-ai' && github.ref == 'refs/heads/main' + opensearch-tests-linux: + needs: + - mypy + - pylint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Setup Opensearch + run: | + docker run -d -p 9201:9200 -p 9600:9600 -e "discovery.type=single-node" opensearchproject/opensearch:1.3.5 + + # TODO Let's try to remove this one from the unit tests + - name: Install pdftotext + run: wget --no-check-certificate https://dl.xpdfreader.com/xpdf-tools-linux-4.04.tar.gz && tar -xvf xpdf-tools-linux-4.04.tar.gz && sudo cp xpdf-tools-linux-4.04/bin64/pdftotext /usr/local/bin + + - name: Setup Python + uses: ./.github/actions/python_cache/ + + - name: Install Haystack + run: pip install . + + - name: Run tests + env: + TOKENIZERS_PARALLELISM: 'false' + run: | + pytest ${{ env.PYTEST_PARAMS }} -m "opensearch and not integration" test/document_stores/ --document_store_type=opensearch + + - name: Dump docker logs on failure + if: failure() + uses: jwalton/gh-docker-logs@v1 + + - uses: act10ns/slack@v1 + with: + status: ${{ job.status }} + channel: '#haystack' + if: failure() && github.repository_owner == 'deepset-ai' && github.ref == 'refs/heads/main' + faiss-tests-linux: needs: - mypy @@ -627,7 +665,7 @@ jobs: - "pipelines" - "modeling" - "others" - - "document_stores/test_opensearch.py" + - "document_stores" runs-on: ubuntu-latest diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index fa78eb7f8..edf11fb58 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -678,32 +678,32 @@ def write_documents(documents: Union[List[dict], List[Document]], Indexes documents for later queries in Elasticsearch. -Behaviour if a document with the same ID already exists in ElasticSearch: +If a document with the same ID already exists in Elasticsearch: a) (Default) Throw Elastic's standard error message for duplicate IDs. b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents. (This is only relevant if you pass your own ID when initializing a `Document`. -If don't set custom IDs for your Documents or just pass a list of dictionaries here, -they will automatically get UUIDs assigned. See the `Document` class for details) +If you don't set custom IDs for your Documents or just pass a list of dictionaries here, +they automatically get UUIDs assigned. See the `Document` class for details.) **Arguments**: -- `documents`: a list of Python dictionaries or a list of Haystack Document objects. +- `documents`: A list of Python dictionaries or a list of Haystack Document objects. For documents as dictionaries, the format is {"content": ""}. Optionally: Include meta data via {"content": "", "meta":{"name": ", "author": "somebody", ...}} -It can be used for filtering and is accessible in the responses of the Finder. -Advanced: If you are using your own Elasticsearch mapping, the key names in the dictionary -should be changed to what you have set for self.content_field and self.name_field. -- `index`: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used. +You can use it for filtering and you can access it in the responses of the Finder. +Advanced: If you are using your own Elasticsearch mapping, change the key names in the dictionary +to what you have set for self.content_field and self.name_field. +- `index`: Elasticsearch index where the documents should be indexed. If you don't specify it, self.index is used. - `batch_size`: Number of documents that are passed to Elasticsearch's bulk function at a time. -- `duplicate_documents`: Handle duplicates document based on parameter options. -Parameter options : ( 'skip','overwrite','fail') -skip: Ignore the duplicates documents +- `duplicate_documents`: Handle duplicate documents based on parameter options. +Parameter options: ( 'skip','overwrite','fail') +skip: Ignore the duplicate documents overwrite: Update any existing documents with the same ID when adding documents. -fail: an error is raised if the document ID of the document being added already +fail: Raises an error if the document ID of the document being added already exists. -- `headers`: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='}) -Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information. +- `headers`: Custom HTTP headers to pass to Elasticsearch client (for example {'Authorization': 'Basic YWRtaW46cm9vdA=='}) +For more information, see [HTTP/REST clients and security](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html). **Raises**: @@ -1742,6 +1742,55 @@ More info at https://www.elastic.co/guide/en/elasticsearch/reference/current/ana - `knn_engine`: The engine you want to use for the nearest neighbor search by OpenSearch's KNN plug-in. Possible values: "nmslib" or "faiss". Defaults to "nmslib". For more information, see [k-NN Index](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/). + + +#### OpenSearchDocumentStore.write\_documents + +```python +def write_documents(documents: Union[List[dict], List[Document]], + index: Optional[str] = None, + batch_size: int = 10_000, + duplicate_documents: Optional[str] = None, + headers: Optional[Dict[str, str]] = None) +``` + +Indexes documents for later queries in OpenSearch. + +If a document with the same ID already exists in OpenSearch: +a) (Default) Throw Elastic's standard error message for duplicate IDs. +b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents. +(This is only relevant if you pass your own ID when initializing a `Document`. +If you don't set custom IDs for your Documents or just pass a list of dictionaries here, +they automatically get UUIDs assigned. See the `Document` class for details.) + +**Arguments**: + +- `documents`: A list of Python dictionaries or a list of Haystack Document objects. +For documents as dictionaries, the format is {"content": ""}. +Optionally: Include meta data via {"content": "", +"meta":{"name": ", "author": "somebody", ...}} +You can use it for filtering and you can access it in the responses of the Finder. +Advanced: If you are using your own OpenSearch mapping, change the key names in the dictionary +to what you have set for self.content_field and self.name_field. +- `index`: OpenSearch index where the documents should be indexed. If you don't specify it, self.index is used. +- `batch_size`: Number of documents that are passed to OpenSearch's bulk function at a time. +- `duplicate_documents`: Handle duplicate documents based on parameter options. +Parameter options: ( 'skip','overwrite','fail') +skip: Ignore the duplicate documents +overwrite: Update any existing documents with the same ID when adding documents. +fail: Raises an error if the document ID of the document being added already +exists. +- `headers`: Custom HTTP headers to pass to OpenSearch client (for example {'Authorization': 'Basic YWRtaW46cm9vdA=='}) +For more information, see [HTTP/REST clients and security](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html). + +**Raises**: + +- `DuplicateDocumentError`: Exception trigger on duplicate document + +**Returns**: + +None + #### OpenSearchDocumentStore.query\_by\_embedding diff --git a/haystack/document_stores/base.py b/haystack/document_stores/base.py index 5325a05ea..e60b283af 100644 --- a/haystack/document_stores/base.py +++ b/haystack/document_stores/base.py @@ -349,7 +349,8 @@ class BaseDocumentStore(BaseComponent): ) -> int: pass - def normalize_embedding(self, emb: np.ndarray) -> None: + @staticmethod + def normalize_embedding(emb: np.ndarray) -> None: """ Performs L2 normalization of embeddings vector inplace. Input can be a single vector (1D array) or a matrix (2D array). @@ -358,10 +359,10 @@ class BaseDocumentStore(BaseComponent): # Single vec if len(emb.shape) == 1: - self._normalize_embedding_1D(emb) + BaseDocumentStore._normalize_embedding_1D(emb) # 2D matrix else: - self._normalize_embedding_2D(emb) + BaseDocumentStore._normalize_embedding_2D(emb) @staticmethod @njit # (fastmath=True) diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index 4df4db2f0..d82fc475f 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -433,30 +433,30 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): """ Indexes documents for later queries in Elasticsearch. - Behaviour if a document with the same ID already exists in ElasticSearch: + If a document with the same ID already exists in Elasticsearch: a) (Default) Throw Elastic's standard error message for duplicate IDs. b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents. (This is only relevant if you pass your own ID when initializing a `Document`. - If don't set custom IDs for your Documents or just pass a list of dictionaries here, - they will automatically get UUIDs assigned. See the `Document` class for details) + If you don't set custom IDs for your Documents or just pass a list of dictionaries here, + they automatically get UUIDs assigned. See the `Document` class for details.) - :param documents: a list of Python dictionaries or a list of Haystack Document objects. + :param documents: A list of Python dictionaries or a list of Haystack Document objects. For documents as dictionaries, the format is {"content": ""}. Optionally: Include meta data via {"content": "", "meta":{"name": ", "author": "somebody", ...}} - It can be used for filtering and is accessible in the responses of the Finder. - Advanced: If you are using your own Elasticsearch mapping, the key names in the dictionary - should be changed to what you have set for self.content_field and self.name_field. - :param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used. + You can use it for filtering and you can access it in the responses of the Finder. + Advanced: If you are using your own Elasticsearch mapping, change the key names in the dictionary + to what you have set for self.content_field and self.name_field. + :param index: Elasticsearch index where the documents should be indexed. If you don't specify it, self.index is used. :param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time. - :param duplicate_documents: Handle duplicates document based on parameter options. - Parameter options : ( 'skip','overwrite','fail') - skip: Ignore the duplicates documents + :param duplicate_documents: Handle duplicate documents based on parameter options. + Parameter options: ( 'skip','overwrite','fail') + skip: Ignore the duplicate documents overwrite: Update any existing documents with the same ID when adding documents. - fail: an error is raised if the document ID of the document being added already + fail: Raises an error if the document ID of the document being added already exists. - :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='}) - Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information. + :param headers: Custom HTTP headers to pass to Elasticsearch client (for example {'Authorization': 'Basic YWRtaW46cm9vdA=='}) + For more information, see [HTTP/REST clients and security](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html). :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ @@ -1483,10 +1483,7 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): with tqdm(total=document_count, position=0, unit=" Docs", desc="Updating embeddings") as progress_bar: for result_batch in get_batches_from_generator(result, batch_size): document_batch = [self._convert_es_hit_to_document(hit, return_embedding=False) for hit in result_batch] - embeddings = retriever.embed_documents(document_batch) - self._validate_embeddings_shape( - embeddings=embeddings, num_documents=len(document_batch), embedding_dim=self.embedding_dim - ) + embeddings = self._embed_documents(document_batch, retriever) doc_updates = [] for doc, emb in zip(document_batch, embeddings): @@ -1501,6 +1498,20 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): self._bulk(documents=doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers) progress_bar.update(batch_size) + def _embed_documents(self, documents: List[Document], retriever: DenseRetriever) -> np.ndarray: + """ + Embed a list of documents using a Retriever. + :param documents: List of documents to embed. + :param retriever: Retriever to use for embedding. + :return: embeddings of documents. + """ + embeddings = retriever.embed_documents(documents) + self._validate_embeddings_shape( + embeddings=embeddings, num_documents=len(documents), embedding_dim=self.embedding_dim + ) + + return embeddings + def delete_all_documents( self, index: Optional[str] = None, diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index 4d7bc39d6..ad4582b4c 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -19,12 +19,22 @@ from haystack.schema import Document from haystack.document_stores.base import get_batches_from_generator from haystack.document_stores.filter_utils import LogicalFilterClause from haystack.errors import DocumentStoreError +from haystack.nodes.retriever import DenseRetriever from .elasticsearch import BaseElasticsearchDocumentStore, prepare_hosts logger = logging.getLogger(__name__) +SIMILARITY_SPACE_TYPE_MAPPINGS = { + "nmslib": {"cosine": "cosinesimil", "dot_product": "innerproduct", "l2": "l2"}, + "faiss": {"cosine": "innerproduct", "dot_product": "innerproduct", "l2": "l2"}, +} +SPACE_TYPE_SIMILARITY_MAPPINGS = { + engine: {v: k for k, v in mapping.items()} for engine, mapping in SIMILARITY_SPACE_TYPE_MAPPINGS.items() +} + + class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): def __init__( self, @@ -171,15 +181,8 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): if knn_engine not in {"nmslib", "faiss"}: raise ValueError(f"knn_engine must be either 'nmslib' or 'faiss' but was {knn_engine}") - if knn_engine == "faiss" and similarity not in {"dot_product", "l2"}: - raise ValueError( - f"knn_engine=`faiss` was set to similarity {similarity}. Currently, we only support 'dot_product' and 'l2' similarities. Set the similarity to one of the supported values." - ) - self.knn_engine = knn_engine self.embeddings_field_supports_similarity = False - self.similarity_to_space_type = {"cosine": "cosinesimil", "dot_product": "innerproduct", "l2": "l2"} - self.space_type_to_similarity = {v: k for k, v in self.similarity_to_space_type.items()} super().__init__( client=client, index=index, @@ -262,6 +265,72 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): return client + def write_documents( + self, + documents: Union[List[dict], List[Document]], + index: Optional[str] = None, + batch_size: int = 10_000, + duplicate_documents: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + ): + """ + Indexes documents for later queries in OpenSearch. + + If a document with the same ID already exists in OpenSearch: + a) (Default) Throw Elastic's standard error message for duplicate IDs. + b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents. + (This is only relevant if you pass your own ID when initializing a `Document`. + If you don't set custom IDs for your Documents or just pass a list of dictionaries here, + they automatically get UUIDs assigned. See the `Document` class for details.) + + :param documents: A list of Python dictionaries or a list of Haystack Document objects. + For documents as dictionaries, the format is {"content": ""}. + Optionally: Include meta data via {"content": "", + "meta":{"name": ", "author": "somebody", ...}} + You can use it for filtering and you can access it in the responses of the Finder. + Advanced: If you are using your own OpenSearch mapping, change the key names in the dictionary + to what you have set for self.content_field and self.name_field. + :param index: OpenSearch index where the documents should be indexed. If you don't specify it, self.index is used. + :param batch_size: Number of documents that are passed to OpenSearch's bulk function at a time. + :param duplicate_documents: Handle duplicate documents based on parameter options. + Parameter options: ( 'skip','overwrite','fail') + skip: Ignore the duplicate documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: Raises an error if the document ID of the document being added already + exists. + :param headers: Custom HTTP headers to pass to OpenSearch client (for example {'Authorization': 'Basic YWRtaW46cm9vdA=='}) + For more information, see [HTTP/REST clients and security](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html). + :raises DuplicateDocumentError: Exception trigger on duplicate document + :return: None + """ + if self.knn_engine == "faiss" and self.similarity == "cosine": + field_map = self._create_document_field_map() + documents = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + embeddings_to_index = np.array([d.embedding for d in documents], dtype="float32") + self.normalize_embedding(embeddings_to_index) + for document, embedding in zip(documents, embeddings_to_index): + document.embedding = None if np.isnan(embedding).any() else embedding + + super().write_documents( + documents=documents, + index=index, + batch_size=batch_size, + duplicate_documents=duplicate_documents, + headers=headers, + ) + + def _embed_documents(self, documents: List[Document], retriever: DenseRetriever) -> np.ndarray: + """ + Embed a list of documents using a Retriever. + :param documents: List of documents to embed. + :param retriever: Retriever to use for embedding. + :return: embeddings of documents. + """ + embeddings = super()._embed_documents(documents, retriever) + if self.knn_engine == "faiss" and self.similarity == "cosine": + self.normalize_embedding(embeddings) + return embeddings + def query_by_embedding( self, query_emb: np.ndarray, @@ -444,7 +513,9 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): ] # Check if desired index settings are equal to settings in existing index - embedding_field_similarity = self.space_type_to_similarity[embedding_field_space_type] + embedding_field_similarity = SPACE_TYPE_SIMILARITY_MAPPINGS[self.knn_engine][ + embedding_field_space_type + ] if embedding_field_similarity != self.similarity: self.embeddings_field_supports_similarity = False logger.warning( @@ -515,7 +586,7 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): if self.embedding_field: index_definition["settings"]["index"] = {"knn": True} # global ef_search setting affects only nmslib, for faiss it is set in the field mapping - if self.index_type == "hnsw": + if self.knn_engine == "nmslib" and self.index_type == "hnsw": index_definition["settings"]["index"]["knn.algo_param.ef_search"] = 20 index_definition["mappings"]["properties"][self.embedding_field] = self._get_embedding_field_mapping( similarity=self.similarity @@ -533,7 +604,7 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): raise e def _get_embedding_field_mapping(self, similarity: str): - space_type = self.similarity_to_space_type[similarity] + space_type = SIMILARITY_SPACE_TYPE_MAPPINGS[self.knn_engine][similarity] method: dict = {"space_type": space_type, "name": "hnsw", "engine": self.knn_engine} if self.index_type == "flat": @@ -589,6 +660,9 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): Generate Elasticsearch query for vector similarity. """ if self.embeddings_field_supports_similarity: + if self.knn_engine == "faiss" and self.similarity == "cosine": + self.normalize_embedding(query_emb) + query: dict = { "bool": {"must": [{"knn": {self.embedding_field: {"vector": query_emb.tolist(), "k": top_k}}}]} } @@ -603,7 +677,7 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): "params": { "field": self.embedding_field, "query_value": query_emb.tolist(), - "space_type": self.similarity_to_space_type[self.similarity], + "space_type": SIMILARITY_SPACE_TYPE_MAPPINGS["nmslib"][self.similarity], }, }, } @@ -613,14 +687,17 @@ class OpenSearchDocumentStore(BaseElasticsearchDocumentStore): def _get_raw_similarity_score(self, score): # adjust scores according to https://opensearch.org/docs/latest/search-plugins/knn/approximate-knn # and https://opensearch.org/docs/latest/search-plugins/knn/knn-score-script/ - if self.similarity == "dot_product": + + # space type is required as criterion as there is no consistent similarity-to-space-type mapping accross knn engines + space_type = SIMILARITY_SPACE_TYPE_MAPPINGS[self.knn_engine][self.similarity] + if space_type == "innerproduct": if score > 1: score = score - 1 else: score = -(1 / score - 1) - elif self.similarity == "l2": + elif space_type == "l2": score = 1 / score - 1 - elif self.similarity == "cosine": + elif space_type == "cosinesimil": if self.embeddings_field_supports_similarity: score = -(1 / score - 2) else: diff --git a/test/conftest.py b/test/conftest.py index b01f6e3e6..1006b7121 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -23,34 +23,20 @@ import psutil import pytest import requests -from haystack import Answer -from haystack.nodes.base import BaseComponent - -try: - from milvus import Milvus - - milvus1 = True -except ImportError: - milvus1 = False - from pymilvus import utility - -try: - from elasticsearch import Elasticsearch - from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore - import weaviate - from haystack.document_stores.weaviate import WeaviateDocumentStore - from haystack.document_stores import MilvusDocumentStore, PineconeDocumentStore - from haystack.document_stores.graphdb import GraphDBKnowledgeGraph - from haystack.document_stores.faiss import FAISSDocumentStore - from haystack.document_stores.sql import SQLDocumentStore - -except (ImportError, ModuleNotFoundError) as ie: - from haystack.utils.import_utils import _optional_component_not_installed - - _optional_component_not_installed("test", "test", ie) - -from haystack.document_stores import BaseDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore - +from haystack import Answer, BaseComponent +from haystack.document_stores import ( + BaseDocumentStore, + DeepsetCloudDocumentStore, + InMemoryDocumentStore, + ElasticsearchDocumentStore, + WeaviateDocumentStore, + MilvusDocumentStore, + PineconeDocumentStore, + OpenSearchDocumentStore, + GraphDBKnowledgeGraph, + FAISSDocumentStore, + SQLDocumentStore, +) from haystack.nodes import ( BaseReader, BaseRetriever, @@ -59,28 +45,42 @@ from haystack.nodes import ( BaseSummarizer, BaseTranslator, DenseRetriever, -) -from haystack.nodes.answer_generator.transformers import Seq2SeqGenerator -from haystack.nodes.answer_generator.transformers import RAGenerator -from haystack.nodes.ranker import SentenceTransformersRanker -from haystack.nodes.document_classifier.transformers import TransformersDocumentClassifier -from haystack.nodes.retriever.sparse import FilterRetriever, BM25Retriever, TfidfRetriever -from haystack.nodes.retriever.dense import ( + Seq2SeqGenerator, + RAGenerator, + SentenceTransformersRanker, + TransformersDocumentClassifier, + FilterRetriever, + BM25Retriever, + TfidfRetriever, DensePassageRetriever, EmbeddingRetriever, MultihopEmbeddingRetriever, TableTextRetriever, + FARMReader, + TransformersReader, + TableReader, + RCIReader, + TransformersSummarizer, + TransformersTranslator, + QuestionGenerator, ) -from haystack.nodes.reader.farm import FARMReader -from haystack.nodes.reader.transformers import TransformersReader -from haystack.nodes.reader.table import TableReader, RCIReader -from haystack.nodes.summarizer.transformers import TransformersSummarizer -from haystack.nodes.translator import TransformersTranslator -from haystack.nodes.question_generator import QuestionGenerator - from haystack.modeling.infer import Inferencer, QAInferencer - from haystack.schema import Document +from haystack.utils.import_utils import _optional_component_not_installed + +try: + from elasticsearch import Elasticsearch + import weaviate +except (ImportError, ModuleNotFoundError) as ie: + _optional_component_not_installed("test", "test", ie) + +try: + from milvus import Milvus + + milvus1 = True +except ImportError: + milvus1 = False + from pymilvus import utility from .mocks import pinecone as pinecone_mock @@ -152,6 +152,7 @@ def pytest_collection_modifyitems(config, items): "pinecone": [pytest.mark.pinecone], # FIXME GraphDB can't be treated as a regular docstore, it fails most of their tests "graphdb": [pytest.mark.integration], + "opensearch": [pytest.mark.opensearch], } for item in items: for name, markers in name_to_markers.items(): @@ -1005,6 +1006,7 @@ def get_document_store( similarity: str = "cosine", recreate_index: bool = True, ): # cosine is default similarity as dot product is not supported by Weaviate + document_store: BaseDocumentStore if document_store_type == "sql": document_store = SQLDocumentStore(url=get_sql_url(tmp_path), index=index, isolation_level="AUTOCOMMIT") @@ -1078,6 +1080,30 @@ def get_document_store( metadata_config={"indexed": META_FIELDS}, ) + elif document_store_type == "opensearch_faiss": + document_store = OpenSearchDocumentStore( + index=index, + return_embedding=True, + embedding_dim=embedding_dim, + embedding_field=embedding_field, + similarity=similarity, + recreate_index=recreate_index, + port=9201, + knn_engine="faiss", + ) + + elif document_store_type == "opensearch": + document_store = OpenSearchDocumentStore( + index=index, + return_embedding=True, + embedding_dim=embedding_dim, + embedding_field=embedding_field, + similarity=similarity, + recreate_index=recreate_index, + port=9201, + knn_engine="nmslib", + ) + else: raise Exception(f"No document store fixture for '{document_store_type}'") diff --git a/test/document_stores/test_document_store.py b/test/document_stores/test_document_store.py index 71d7134d6..5f039820e 100644 --- a/test/document_stores/test_document_store.py +++ b/test/document_stores/test_document_store.py @@ -1,6 +1,7 @@ +from copy import deepcopy import logging +import math import sys -from typing import List from uuid import uuid4 import numpy as np @@ -16,19 +17,27 @@ from elasticsearch.exceptions import RequestError from ..conftest import ( deepset_cloud_fixture, get_document_store, + ensure_ids_are_correct_uuids, MOCK_DC, DC_API_ENDPOINT, DC_API_KEY, DC_TEST_INDEX, SAMPLES_PATH, ) -from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore +from haystack.document_stores import ( + WeaviateDocumentStore, + DeepsetCloudDocumentStore, + InMemoryDocumentStore, + MilvusDocumentStore, + FAISSDocumentStore, + ElasticsearchDocumentStore, + OpenSearchDocumentStore, +) + from haystack.document_stores.base import BaseDocumentStore from haystack.document_stores.es_converter import elasticsearch_index_to_document_store from haystack.errors import DuplicateDocumentError from haystack.schema import Document, Label, Answer, Span -from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore -from haystack.document_stores.faiss import FAISSDocumentStore from haystack.nodes import EmbeddingRetriever, PreProcessor from haystack.pipelines import DocumentSearchPipeline from haystack.utils import DeepsetCloudError @@ -2108,3 +2117,151 @@ def test_elasticsearch_brownfield_support(document_store_with_docs): assert all("numeric_field" in doc.meta for doc in transferred_documents) # Check if number of transferred_documents is equal to number of unique words. assert len(transferred_documents) == len(set(" ".join(original_content).split())) + + +@pytest.mark.parametrize( + "document_store", + ["faiss", "milvus1", "milvus", "weaviate", "opensearch_faiss", "opensearch", "elasticsearch", "memory"], + indirect=True, +) +def test_cosine_similarity(document_store: BaseDocumentStore): + # below we will write documents to the store and then query it to see if vectors were normalized or not + ensure_ids_are_correct_uuids(docs=DOCUMENTS, document_store=document_store) + document_store.write_documents(documents=DOCUMENTS) + + query = np.random.rand(768).astype(np.float32) + query_results = document_store.query_by_embedding( + query_emb=query, top_k=len(DOCUMENTS), return_embedding=True, scale_score=False + ) + + # check if search with cosine similarity returns the correct number of results + assert len(query_results) == len(DOCUMENTS) + + original_embeddings = {doc["content"]: doc["embedding"] for doc in DOCUMENTS} + + for doc in query_results: + result_emb = doc.embedding + original_emb = original_embeddings[doc.content] + + expected_emb = original_emb + # embeddings of document stores which only support dot product out of the box must be normalized + if ( + isinstance(document_store, (FAISSDocumentStore, MilvusDocumentStore, WeaviateDocumentStore)) + or type(document_store).name == "Milvus1DocumentStore" + or isinstance(document_store, OpenSearchDocumentStore) + and document_store.knn_engine == "faiss" + ): + expected_emb = original_emb / np.linalg.norm(original_emb) + + # check if the stored embedding was normalized or not + np.testing.assert_allclose( + expected_emb, result_emb, rtol=0.2, atol=5e-07 + ) # high tolerance necessary for Milvus 2 + + # check if the score is plausible for cosine similarity + cosine_score = np.dot(result_emb, query) / (np.linalg.norm(result_emb) * np.linalg.norm(query)) + assert cosine_score == pytest.approx(doc.score, 0.01) + + +@pytest.mark.parametrize( + "document_store", + ["faiss", "milvus1", "milvus", "weaviate", "opensearch_faiss", "opensearch", "elasticsearch", "memory"], + indirect=True, +) +def test_update_embeddings_cosine_similarity(document_store: BaseDocumentStore): + # below we will write documents to the store and then query it to see if vectors were normalized + ensure_ids_are_correct_uuids(docs=DOCUMENTS, document_store=document_store) + # clear embeddings + docs = deepcopy(DOCUMENTS) + for doc in docs: + doc.pop("embedding") + + document_store.write_documents(documents=docs) + original_embeddings = {} + + # now check if vectors are normalized when updating embeddings + class MockRetriever: + def embed_documents(self, docs): + embeddings = [] + for doc in docs: + embedding = np.random.rand(768).astype(np.float32) + original_embeddings[doc.content] = embedding + embeddings.append(embedding) + return np.stack(embeddings) + + retriever = MockRetriever() + document_store.update_embeddings(retriever=retriever) + + query = np.random.rand(768).astype(np.float32) + query_results = document_store.query_by_embedding( + query_emb=query, top_k=len(DOCUMENTS), return_embedding=True, scale_score=False + ) + + # check if search with cosine similarity returns the correct number of results + assert len(query_results) == len(DOCUMENTS) + + for doc in query_results: + result_emb = doc.embedding + original_emb = original_embeddings[doc.content] + + expected_emb = original_emb + # embeddings of document stores which only support dot product out of the box must be normalized + if ( + isinstance(document_store, (FAISSDocumentStore, MilvusDocumentStore, WeaviateDocumentStore)) + or type(document_store).name == "Milvus1DocumentStore" + or isinstance(document_store, OpenSearchDocumentStore) + and document_store.knn_engine == "faiss" + ): + expected_emb = original_emb / np.linalg.norm(original_emb) + + # check if the stored embedding was normalized or not + np.testing.assert_allclose( + expected_emb, result_emb, rtol=0.2, atol=5e-07 + ) # high tolerance necessary for Milvus 2 + + # check if the score is plausible for cosine similarity + cosine_score = np.dot(result_emb, query) / (np.linalg.norm(result_emb) * np.linalg.norm(query)) + assert cosine_score == pytest.approx(doc.score, 0.01) + + +@pytest.mark.parametrize( + "document_store_small", + ["faiss", "milvus1", "milvus", "weaviate", "memory", "elasticsearch", "opensearch", "opensearch_faiss"], + indirect=True, +) +def test_cosine_sanity_check(document_store_small): + VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32") + VEC_2 = np.array([0.4, 0.5, 0.6], dtype="float32") + + # This is the cosine similarity of VEC_1 and VEC_2 calculated using sklearn.metrics.pairwise.cosine_similarity + # The score is normalized to yield a value between 0 and 1. + KNOWN_COSINE = 0.9746317 + KNOWN_SCALED_COSINE = (KNOWN_COSINE + 1) / 2 + + docs = [{"name": "vec_1", "text": "vec_1", "content": "vec_1", "embedding": VEC_1}] + ensure_ids_are_correct_uuids(docs=docs, document_store=document_store_small) + document_store_small.write_documents(documents=docs) + + query_results = document_store_small.query_by_embedding( + query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=True + ) + + # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318 + assert math.isclose(query_results[0].score, KNOWN_SCALED_COSINE, abs_tol=0.0002) + + query_results = document_store_small.query_by_embedding( + query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=False + ) + + # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318 + assert math.isclose(query_results[0].score, KNOWN_COSINE, abs_tol=0.0002) + + +def test_normalize_embeddings_diff_shapes(): + VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32") + BaseDocumentStore.normalize_embedding(VEC_1) + assert np.linalg.norm(VEC_1) - 1 < 0.01 + + VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32").reshape(1, -1) + BaseDocumentStore.normalize_embedding(VEC_1) + assert np.linalg.norm(VEC_1) - 1 < 0.01 diff --git a/test/document_stores/test_faiss.py b/test/document_stores/test_faiss.py new file mode 100644 index 000000000..87302af3c --- /dev/null +++ b/test/document_stores/test_faiss.py @@ -0,0 +1,285 @@ +import sys + +import yaml +import faiss +import pytest +import numpy as np + +from haystack.schema import Document +from haystack.document_stores.faiss import FAISSDocumentStore + +from haystack.pipelines import Pipeline +from haystack.nodes.retriever.dense import EmbeddingRetriever + +from ..conftest import MockDenseRetriever + + +DOCUMENTS = [ + { + "meta": {"name": "name_1", "year": "2020", "month": "01"}, + "content": "text_1", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_2", "year": "2020", "month": "02"}, + "content": "text_2", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_3", "year": "2020", "month": "03"}, + "content": "text_3", + "embedding": np.random.rand(768).astype(np.float64), + }, + { + "meta": {"name": "name_4", "year": "2021", "month": "01"}, + "content": "text_4", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_5", "year": "2021", "month": "02"}, + "content": "text_5", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_6", "year": "2021", "month": "03"}, + "content": "text_6", + "embedding": np.random.rand(768).astype(np.float64), + }, +] + + +@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") +def test_faiss_index_save_and_load(tmp_path, sql_url): + document_store = FAISSDocumentStore( + sql_url=sql_url, + index="haystack_test", + progress_bar=False, # Just to check if the init parameters are kept + isolation_level="AUTOCOMMIT", + ) + document_store.write_documents(DOCUMENTS) + + # test saving the index + document_store.save(tmp_path / "haystack_test_faiss") + + # clear existing faiss_index + document_store.faiss_indexes[document_store.index].reset() + + # test faiss index is cleared + assert document_store.faiss_indexes[document_store.index].ntotal == 0 + + # test loading the index + new_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss") + + # check faiss index is restored + assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not new_document_store.progress_bar + + # test saving and loading the loaded faiss index + new_document_store.save(tmp_path / "haystack_test_faiss") + reloaded_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss") + + # check faiss index is restored + assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not reloaded_document_store.progress_bar + + # test loading the index via init + new_document_store = FAISSDocumentStore(faiss_index_path=tmp_path / "haystack_test_faiss") + + # check faiss index is restored + assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not new_document_store.progress_bar + + +@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") +def test_faiss_index_save_and_load_custom_path(tmp_path, sql_url): + document_store = FAISSDocumentStore( + sql_url=sql_url, + index="haystack_test", + progress_bar=False, # Just to check if the init parameters are kept + isolation_level="AUTOCOMMIT", + ) + document_store.write_documents(DOCUMENTS) + + # test saving the index + document_store.save(index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json") + + # clear existing faiss_index + document_store.faiss_indexes[document_store.index].reset() + + # test faiss index is cleared + assert document_store.faiss_indexes[document_store.index].ntotal == 0 + + # test loading the index + new_document_store = FAISSDocumentStore.load( + index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json" + ) + + # check faiss index is restored + assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not new_document_store.progress_bar + + # test saving and loading the loaded faiss index + new_document_store.save(tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json") + reloaded_document_store = FAISSDocumentStore.load( + tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json" + ) + + # check faiss index is restored + assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not reloaded_document_store.progress_bar + + # test loading the index via init + new_document_store = FAISSDocumentStore( + faiss_index_path=tmp_path / "haystack_test_faiss", faiss_config_path=tmp_path / "custom_path.json" + ) + + # check faiss index is restored + assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) + # check if documents are restored + assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) + # Check if the init parameters are kept + assert not new_document_store.progress_bar + + +@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") +def test_faiss_index_mutual_exclusive_args(tmp_path): + with pytest.raises(ValueError): + FAISSDocumentStore( + sql_url=f"sqlite:////{tmp_path/'haystack_test.db'}", + faiss_index_path=f"{tmp_path/'haystack_test'}", + isolation_level="AUTOCOMMIT", + ) + + with pytest.raises(ValueError): + FAISSDocumentStore( + f"sqlite:////{tmp_path/'haystack_test.db'}", + faiss_index_path=f"{tmp_path/'haystack_test'}", + isolation_level="AUTOCOMMIT", + ) + + +@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) +@pytest.mark.parametrize("index_buffer_size", [10_000, 2]) +@pytest.mark.parametrize("batch_size", [2]) +def test_faiss_write_docs(document_store, index_buffer_size, batch_size): + document_store.index_buffer_size = index_buffer_size + + # Write in small batches + for i in range(0, len(DOCUMENTS), batch_size): + document_store.write_documents(DOCUMENTS[i : i + batch_size]) + + documents_indexed = document_store.get_all_documents() + assert len(documents_indexed) == len(DOCUMENTS) + + # test if correct vectors are associated with docs + for i, doc in enumerate(documents_indexed): + # we currently don't get the embeddings back when we call document_store.get_all_documents() + original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0] + stored_emb = document_store.faiss_indexes[document_store.index].reconstruct(int(doc.meta["vector_id"])) + # compare original input vec with stored one (ignore extra dim added by hnsw) + # original input vec is normalized as faiss only stores normalized vectors + assert np.allclose(original_doc["embedding"] / np.linalg.norm(original_doc["embedding"]), stored_emb, rtol=0.01) + + +@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") +@pytest.mark.parametrize("index_factory", ["Flat", "HNSW", "IVF1,Flat"]) +def test_faiss_retrieving(index_factory, tmp_path): + document_store = FAISSDocumentStore( + sql_url=f"sqlite:////{tmp_path/'test_faiss_retrieving.db'}", + faiss_index_factory_str=index_factory, + isolation_level="AUTOCOMMIT", + ) + + document_store.delete_all_documents(index="document") + if "ivf" in index_factory.lower(): + document_store.train_index(DOCUMENTS) + document_store.write_documents(DOCUMENTS) + + retriever = EmbeddingRetriever( + document_store=document_store, embedding_model="deepset/sentence_bert", use_gpu=False + ) + result = retriever.retrieve(query="How to test this?") + + assert len(result) == len(DOCUMENTS) + assert type(result[0]) == Document + + # Cleanup + document_store.faiss_indexes[document_store.index].reset() + + +@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") +def test_faiss_passing_index_from_outside(tmp_path): + d = 768 + nlist = 2 + quantizer = faiss.IndexFlatIP(d) + index = "haystack_test_1" + faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_INNER_PRODUCT) + faiss_index.set_direct_map_type(faiss.DirectMap.Hashtable) + faiss_index.nprobe = 2 + document_store = FAISSDocumentStore( + sql_url=f"sqlite:////{tmp_path/'haystack_test_faiss.db'}", + faiss_index=faiss_index, + index=index, + isolation_level="AUTOCOMMIT", + ) + + document_store.delete_documents() + # as it is a IVF index we need to train it before adding docs + document_store.train_index(DOCUMENTS) + + document_store.write_documents(documents=DOCUMENTS) + documents_indexed = document_store.get_all_documents() + + # test if vectors ids are associated with docs + for doc in documents_indexed: + assert 0 <= int(doc.meta["vector_id"]) <= 7 + + +@pytest.mark.integration +def test_pipeline_with_existing_faiss_docstore(tmp_path): + + document_store: FAISSDocumentStore = FAISSDocumentStore( + sql_url=f'sqlite:///{(tmp_path / "faiss_document_store.db").absolute()}' + ) + retriever = MockDenseRetriever(document_store=document_store) + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, update_existing_embeddings=True) + + document_store.save(tmp_path / "existing_faiss_document_store") + + query_config = f""" +version: ignore +components: + - name: DPRRetriever + type: MockDenseRetriever + params: + document_store: ExistingFAISSDocumentStore + - name: ExistingFAISSDocumentStore + type: FAISSDocumentStore + params: + faiss_index_path: '{tmp_path / "existing_faiss_document_store"}' +pipelines: + - name: query_pipeline + nodes: + - name: DPRRetriever + inputs: [Query] + """ + pipeline = Pipeline.load_from_config(yaml.safe_load(query_config)) + existing_document_store = pipeline.get_document_store() + faiss_index = existing_document_store.faiss_indexes["document"] + assert faiss_index.ntotal == len(DOCUMENTS) diff --git a/test/document_stores/test_faiss_and_milvus.py b/test/document_stores/test_faiss_and_milvus.py deleted file mode 100644 index 7ce675151..000000000 --- a/test/document_stores/test_faiss_and_milvus.py +++ /dev/null @@ -1,606 +0,0 @@ -import os -import sys -import math -from pathlib import Path - -import yaml -import faiss -import pytest -import numpy as np - -from haystack.schema import Document -from haystack.pipelines import DocumentSearchPipeline -from haystack.document_stores.base import BaseDocumentStore -from haystack.document_stores.faiss import FAISSDocumentStore - -from haystack.pipelines import Pipeline -from haystack.nodes.retriever.dense import EmbeddingRetriever - -from ..conftest import ensure_ids_are_correct_uuids, SAMPLES_PATH, MockDenseRetriever - - -DOCUMENTS = [ - { - "meta": {"name": "name_1", "year": "2020", "month": "01"}, - "content": "text_1", - "embedding": np.random.rand(768).astype(np.float32), - }, - { - "meta": {"name": "name_2", "year": "2020", "month": "02"}, - "content": "text_2", - "embedding": np.random.rand(768).astype(np.float32), - }, - { - "meta": {"name": "name_3", "year": "2020", "month": "03"}, - "content": "text_3", - "embedding": np.random.rand(768).astype(np.float64), - }, - { - "meta": {"name": "name_4", "year": "2021", "month": "01"}, - "content": "text_4", - "embedding": np.random.rand(768).astype(np.float32), - }, - { - "meta": {"name": "name_5", "year": "2021", "month": "02"}, - "content": "text_5", - "embedding": np.random.rand(768).astype(np.float32), - }, - { - "meta": {"name": "name_6", "year": "2021", "month": "03"}, - "content": "text_6", - "embedding": np.random.rand(768).astype(np.float64), - }, -] - - -@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") -def test_faiss_index_save_and_load(tmp_path, sql_url): - document_store = FAISSDocumentStore( - sql_url=sql_url, - index="haystack_test", - progress_bar=False, # Just to check if the init parameters are kept - isolation_level="AUTOCOMMIT", - ) - document_store.write_documents(DOCUMENTS) - - # test saving the index - document_store.save(tmp_path / "haystack_test_faiss") - - # clear existing faiss_index - document_store.faiss_indexes[document_store.index].reset() - - # test faiss index is cleared - assert document_store.faiss_indexes[document_store.index].ntotal == 0 - - # test loading the index - new_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss") - - # check faiss index is restored - assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not new_document_store.progress_bar - - # test saving and loading the loaded faiss index - new_document_store.save(tmp_path / "haystack_test_faiss") - reloaded_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss") - - # check faiss index is restored - assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not reloaded_document_store.progress_bar - - # test loading the index via init - new_document_store = FAISSDocumentStore(faiss_index_path=tmp_path / "haystack_test_faiss") - - # check faiss index is restored - assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not new_document_store.progress_bar - - -@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") -def test_faiss_index_save_and_load_custom_path(tmp_path, sql_url): - document_store = FAISSDocumentStore( - sql_url=sql_url, - index="haystack_test", - progress_bar=False, # Just to check if the init parameters are kept - isolation_level="AUTOCOMMIT", - ) - document_store.write_documents(DOCUMENTS) - - # test saving the index - document_store.save(index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json") - - # clear existing faiss_index - document_store.faiss_indexes[document_store.index].reset() - - # test faiss index is cleared - assert document_store.faiss_indexes[document_store.index].ntotal == 0 - - # test loading the index - new_document_store = FAISSDocumentStore.load( - index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json" - ) - - # check faiss index is restored - assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not new_document_store.progress_bar - - # test saving and loading the loaded faiss index - new_document_store.save(tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json") - reloaded_document_store = FAISSDocumentStore.load( - tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json" - ) - - # check faiss index is restored - assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not reloaded_document_store.progress_bar - - # test loading the index via init - new_document_store = FAISSDocumentStore( - faiss_index_path=tmp_path / "haystack_test_faiss", faiss_config_path=tmp_path / "custom_path.json" - ) - - # check faiss index is restored - assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS) - # check if documents are restored - assert len(new_document_store.get_all_documents()) == len(DOCUMENTS) - # Check if the init parameters are kept - assert not new_document_store.progress_bar - - -@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") -def test_faiss_index_mutual_exclusive_args(tmp_path): - with pytest.raises(ValueError): - FAISSDocumentStore( - sql_url=f"sqlite:////{tmp_path/'haystack_test.db'}", - faiss_index_path=f"{tmp_path/'haystack_test'}", - isolation_level="AUTOCOMMIT", - ) - - with pytest.raises(ValueError): - FAISSDocumentStore( - f"sqlite:////{tmp_path/'haystack_test.db'}", - faiss_index_path=f"{tmp_path/'haystack_test'}", - isolation_level="AUTOCOMMIT", - ) - - -@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) -@pytest.mark.parametrize("index_buffer_size", [10_000, 2]) -@pytest.mark.parametrize("batch_size", [2]) -def test_faiss_write_docs(document_store, index_buffer_size, batch_size): - document_store.index_buffer_size = index_buffer_size - - # Write in small batches - for i in range(0, len(DOCUMENTS), batch_size): - document_store.write_documents(DOCUMENTS[i : i + batch_size]) - - documents_indexed = document_store.get_all_documents() - assert len(documents_indexed) == len(DOCUMENTS) - - # test if correct vectors are associated with docs - for i, doc in enumerate(documents_indexed): - # we currently don't get the embeddings back when we call document_store.get_all_documents() - original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0] - stored_emb = document_store.faiss_indexes[document_store.index].reconstruct(int(doc.meta["vector_id"])) - # compare original input vec with stored one (ignore extra dim added by hnsw) - # original input vec is normalized as faiss only stores normalized vectors - assert np.allclose(original_doc["embedding"] / np.linalg.norm(original_doc["embedding"]), stored_emb, rtol=0.01) - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -@pytest.mark.parametrize("batch_size", [4, 6]) -def test_update_docs(document_store, retriever, batch_size): - # initial write - document_store.write_documents(DOCUMENTS) - - document_store.update_embeddings(retriever=retriever, batch_size=batch_size) - documents_indexed = document_store.get_all_documents() - assert len(documents_indexed) == len(DOCUMENTS) - - # test if correct vectors are associated with docs - for doc in documents_indexed: - original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0] - updated_embedding = retriever.embed_documents([Document.from_dict(original_doc)]) - stored_doc = document_store.get_all_documents(filters={"name": [doc.meta["name"]]})[0] - # compare original input vec with stored one (ignore extra dim added by hnsw) - # original input vec is normalized as faiss only stores normalized vectors - a = updated_embedding / np.linalg.norm(updated_embedding) - assert np.allclose(a[0], stored_doc.embedding, rtol=0.2) # high tolerance necessary for Milvus 2 - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["milvus1", "milvus", "faiss"], indirect=True) -def test_update_existing_docs(document_store, retriever): - document_store.duplicate_documents = "overwrite" - old_document = Document(content="text_1") - # initial write - document_store.write_documents([old_document]) - document_store.update_embeddings(retriever=retriever) - old_documents_indexed = document_store.get_all_documents(return_embedding=True) - assert len(old_documents_indexed) == 1 - - # Update document data - new_document = Document(content="text_2") - new_document.id = old_document.id - document_store.write_documents([new_document]) - document_store.update_embeddings(retriever=retriever) - new_documents_indexed = document_store.get_all_documents(return_embedding=True) - assert len(new_documents_indexed) == 1 - - assert old_documents_indexed[0].id == new_documents_indexed[0].id - assert old_documents_indexed[0].content == "text_1" - assert new_documents_indexed[0].content == "text_2" - print(type(old_documents_indexed[0].embedding)) - print(type(new_documents_indexed[0].embedding)) - assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01) - - -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_update_with_empty_store(document_store, retriever): - # Call update with empty doc store - document_store.update_embeddings(retriever=retriever) - - # initial write - document_store.write_documents(DOCUMENTS) - - documents_indexed = document_store.get_all_documents() - - assert len(documents_indexed) == len(DOCUMENTS) - - -@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") -@pytest.mark.parametrize("index_factory", ["Flat", "HNSW", "IVF1,Flat"]) -def test_faiss_retrieving(index_factory, tmp_path): - document_store = FAISSDocumentStore( - sql_url=f"sqlite:////{tmp_path/'test_faiss_retrieving.db'}", - faiss_index_factory_str=index_factory, - isolation_level="AUTOCOMMIT", - ) - - document_store.delete_all_documents(index="document") - if "ivf" in index_factory.lower(): - document_store.train_index(DOCUMENTS) - document_store.write_documents(DOCUMENTS) - - retriever = EmbeddingRetriever( - document_store=document_store, embedding_model="deepset/sentence_bert", use_gpu=False - ) - result = retriever.retrieve(query="How to test this?") - - assert len(result) == len(DOCUMENTS) - assert type(result[0]) == Document - - # Cleanup - document_store.faiss_indexes[document_store.index].reset() - - -@pytest.mark.parametrize("retriever", ["embedding"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_finding(document_store, retriever): - document_store.write_documents(DOCUMENTS) - pipe = DocumentSearchPipeline(retriever=retriever) - - prediction = pipe.run(query="How to test this?", params={"Retriever": {"top_k": 1}}) - - assert len(prediction.get("documents", [])) == 1 - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_delete_docs_with_filters(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - document_store.delete_documents(filters={"name": ["name_1", "name_2", "name_3", "name_4"]}) - - documents = document_store.get_all_documents() - assert len(documents) == 2 - assert document_store.get_embedding_count() == 2 - assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"} - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_delete_docs_with_filters(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - document_store.delete_documents(filters={"year": ["2020"]}) - - documents = document_store.get_all_documents() - assert len(documents) == 3 - assert document_store.get_embedding_count() == 3 - assert all("2021" == doc.meta["year"] for doc in documents) - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_delete_docs_with_many_filters(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - document_store.delete_documents(filters={"month": ["01"], "year": ["2020"]}) - - documents = document_store.get_all_documents() - assert len(documents) == 5 - assert document_store.get_embedding_count() == 5 - assert "name_1" not in {doc.meta["name"] for doc in documents} - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_delete_docs_by_id(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - doc_ids = [doc.id for doc in document_store.get_all_documents()] - ids_to_delete = doc_ids[0:3] - - document_store.delete_documents(ids=ids_to_delete) - - documents = document_store.get_all_documents() - assert len(documents) == len(doc_ids) - len(ids_to_delete) - assert document_store.get_embedding_count() == len(doc_ids) - len(ids_to_delete) - - remaining_ids = [doc.id for doc in documents] - assert all(doc_id not in remaining_ids for doc_id in ids_to_delete) - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_delete_docs_by_id_with_filters(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - ids_to_delete = [doc.id for doc in document_store.get_all_documents(filters={"name": ["name_1", "name_2"]})] - ids_not_to_delete = [ - doc.id for doc in document_store.get_all_documents(filters={"name": ["name_3", "name_4", "name_5", "name_6"]}) - ] - - document_store.delete_documents(ids=ids_to_delete, filters={"name": ["name_1", "name_2", "name_3", "name_4"]}) - - documents = document_store.get_all_documents() - assert len(documents) == len(DOCUMENTS) - len(ids_to_delete) - assert document_store.get_embedding_count() == len(DOCUMENTS) - len(ids_to_delete) - - assert all(doc.meta["name"] != "name_1" for doc in documents) - assert all(doc.meta["name"] != "name_2" for doc in documents) - - all_ids_left = [doc.id for doc in documents] - assert all(doc_id in all_ids_left for doc_id in ids_not_to_delete) - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_get_docs_with_filters_one_value(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - documents = document_store.get_all_documents(filters={"year": ["2020"]}) - - assert len(documents) == 3 - assert all("2020" == doc.meta["year"] for doc in documents) - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_get_docs_with_filters_many_values(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - documents = document_store.get_all_documents(filters={"name": ["name_5", "name_6"]}) - - assert len(documents) == 2 - assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"} - - -@pytest.mark.integration -@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_get_docs_with_many_filters(document_store, retriever): - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, batch_size=4) - assert document_store.get_embedding_count() == 6 - - documents = document_store.get_all_documents(filters={"month": ["01"], "year": ["2020"]}) - - assert len(documents) == 1 - assert "name_1" == documents[0].meta["name"] - assert "01" == documents[0].meta["month"] - assert "2020" == documents[0].meta["year"] - - -@pytest.mark.parametrize("retriever", ["embedding"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) -def test_pipeline(document_store, retriever): - documents = [ - {"name": "name_1", "content": "text_1", "embedding": np.random.rand(768).astype(np.float32)}, - {"name": "name_2", "content": "text_2", "embedding": np.random.rand(768).astype(np.float32)}, - {"name": "name_3", "content": "text_3", "embedding": np.random.rand(768).astype(np.float64)}, - {"name": "name_4", "content": "text_4", "embedding": np.random.rand(768).astype(np.float32)}, - ] - document_store.write_documents(documents) - pipeline = Pipeline() - pipeline.add_node(component=retriever, name="FAISS", inputs=["Query"]) - output = pipeline.run(query="How to test this?", params={"FAISS": {"top_k": 3}}) - assert len(output["documents"]) == 3 - - -@pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner") -def test_faiss_passing_index_from_outside(tmp_path): - d = 768 - nlist = 2 - quantizer = faiss.IndexFlatIP(d) - index = "haystack_test_1" - faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_INNER_PRODUCT) - faiss_index.set_direct_map_type(faiss.DirectMap.Hashtable) - faiss_index.nprobe = 2 - document_store = FAISSDocumentStore( - sql_url=f"sqlite:////{tmp_path/'haystack_test_faiss.db'}", - faiss_index=faiss_index, - index=index, - isolation_level="AUTOCOMMIT", - ) - - document_store.delete_documents() - # as it is a IVF index we need to train it before adding docs - document_store.train_index(DOCUMENTS) - - document_store.write_documents(documents=DOCUMENTS) - documents_indexed = document_store.get_all_documents() - - # test if vectors ids are associated with docs - for doc in documents_indexed: - assert 0 <= int(doc.meta["vector_id"]) <= 7 - - -@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus", "weaviate"], indirect=True) -def test_cosine_similarity(document_store): - # below we will write documents to the store and then query it to see if vectors were normalized - ensure_ids_are_correct_uuids(docs=DOCUMENTS, document_store=document_store) - document_store.write_documents(documents=DOCUMENTS) - - # note that the same query will be used later when querying after updating the embeddings - query = np.random.rand(768).astype(np.float32) - - query_results = document_store.query_by_embedding(query_emb=query, top_k=len(DOCUMENTS), return_embedding=True) - - # check if search with cosine similarity returns the correct number of results - assert len(query_results) == len(DOCUMENTS) - indexed_docs = {} - for doc in DOCUMENTS: - indexed_docs[doc["content"]] = doc["embedding"] - indexed_docs[doc["content"]] /= np.linalg.norm(doc["embedding"]) - - for doc in query_results: - result_emb = doc.embedding - original_emb = indexed_docs[doc.content].astype("float32") - - # check if the stored embedding was normalized - np.testing.assert_allclose( - original_emb, result_emb, rtol=0.2, atol=5e-07 - ) # high tolerance necessary for Milvus 2 - - # check if the score is plausible for cosine similarity - assert 0 <= doc.score <= 1.0 - - # now check if vectors are normalized when updating embeddings - class MockRetriever: - def embed_documents(self, docs): - return np.random.rand(len(docs), 768).astype(np.float32) - - retriever = MockRetriever() - document_store.update_embeddings(retriever=retriever) - query_results = document_store.query_by_embedding(query_emb=query, top_k=len(DOCUMENTS), return_embedding=True) - - for doc in query_results: - original_emb = np.array([indexed_docs[doc.content]], dtype="float32") - document_store.normalize_embedding(original_emb[0]) - # check if the original embedding has changed after updating the embeddings - assert not np.allclose(original_emb[0], doc.embedding, rtol=0.01) - - -@pytest.mark.parametrize("document_store_dot_product_small", ["faiss", "milvus1", "milvus"], indirect=True) -def test_normalize_embeddings_diff_shapes(document_store_dot_product_small): - VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32") - document_store_dot_product_small.normalize_embedding(VEC_1) - assert np.linalg.norm(VEC_1) - 1 < 0.01 - - VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32").reshape(1, -1) - document_store_dot_product_small.normalize_embedding(VEC_1) - assert np.linalg.norm(VEC_1) - 1 < 0.01 - - -@pytest.mark.parametrize("document_store_small", ["faiss", "milvus1", "milvus", "weaviate"], indirect=True) -def test_cosine_sanity_check(document_store_small): - VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32") - VEC_2 = np.array([0.4, 0.5, 0.6], dtype="float32") - - # This is the cosine similarity of VEC_1 and VEC_2 calculated using sklearn.metrics.pairwise.cosine_similarity - # The score is normalized to yield a value between 0 and 1. - KNOWN_COSINE = 0.9746317 - KNOWN_SCALED_COSINE = (KNOWN_COSINE + 1) / 2 - - docs = [{"name": "vec_1", "text": "vec_1", "content": "vec_1", "embedding": VEC_1}] - ensure_ids_are_correct_uuids(docs=docs, document_store=document_store_small) - document_store_small.write_documents(documents=docs) - - query_results = document_store_small.query_by_embedding( - query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=True - ) - - # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318 - assert math.isclose(query_results[0].score, KNOWN_SCALED_COSINE, abs_tol=0.00002) - - query_results = document_store_small.query_by_embedding( - query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=False - ) - - # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318 - assert math.isclose(query_results[0].score, KNOWN_COSINE, abs_tol=0.00002) - - -@pytest.mark.integration -def test_pipeline_with_existing_faiss_docstore(tmp_path): - - document_store: FAISSDocumentStore = FAISSDocumentStore( - sql_url=f'sqlite:///{(tmp_path / "faiss_document_store.db").absolute()}' - ) - retriever = MockDenseRetriever(document_store=document_store) - document_store.write_documents(DOCUMENTS) - document_store.update_embeddings(retriever=retriever, update_existing_embeddings=True) - - document_store.save(tmp_path / "existing_faiss_document_store") - - query_config = f""" -version: ignore -components: - - name: DPRRetriever - type: MockDenseRetriever - params: - document_store: ExistingFAISSDocumentStore - - name: ExistingFAISSDocumentStore - type: FAISSDocumentStore - params: - faiss_index_path: '{tmp_path / "existing_faiss_document_store"}' -pipelines: - - name: query_pipeline - nodes: - - name: DPRRetriever - inputs: [Query] - """ - pipeline = Pipeline.load_from_config(yaml.safe_load(query_config)) - existing_document_store = pipeline.get_document_store() - faiss_index = existing_document_store.faiss_indexes["document"] - assert faiss_index.ntotal == len(DOCUMENTS) diff --git a/test/document_stores/test_sql_based.py b/test/document_stores/test_sql_based.py new file mode 100644 index 000000000..6cf73f931 --- /dev/null +++ b/test/document_stores/test_sql_based.py @@ -0,0 +1,276 @@ +import math + +import pytest +import numpy as np + +from haystack.schema import Document +from haystack.pipelines import DocumentSearchPipeline + +from haystack.pipelines import Pipeline + + +DOCUMENTS = [ + { + "meta": {"name": "name_1", "year": "2020", "month": "01"}, + "content": "text_1", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_2", "year": "2020", "month": "02"}, + "content": "text_2", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_3", "year": "2020", "month": "03"}, + "content": "text_3", + "embedding": np.random.rand(768).astype(np.float64), + }, + { + "meta": {"name": "name_4", "year": "2021", "month": "01"}, + "content": "text_4", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_5", "year": "2021", "month": "02"}, + "content": "text_5", + "embedding": np.random.rand(768).astype(np.float32), + }, + { + "meta": {"name": "name_6", "year": "2021", "month": "03"}, + "content": "text_6", + "embedding": np.random.rand(768).astype(np.float64), + }, +] + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +@pytest.mark.parametrize("batch_size", [4, 6]) +def test_update_docs(document_store, retriever, batch_size): + # initial write + document_store.write_documents(DOCUMENTS) + + document_store.update_embeddings(retriever=retriever, batch_size=batch_size) + documents_indexed = document_store.get_all_documents() + assert len(documents_indexed) == len(DOCUMENTS) + + # test if correct vectors are associated with docs + for doc in documents_indexed: + original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0] + updated_embedding = retriever.embed_documents([Document.from_dict(original_doc)]) + stored_doc = document_store.get_all_documents(filters={"name": [doc.meta["name"]]})[0] + # compare original input vec with stored one (ignore extra dim added by hnsw) + # original input vec is normalized as faiss only stores normalized vectors + a = updated_embedding / np.linalg.norm(updated_embedding) + assert np.allclose(a[0], stored_doc.embedding, rtol=0.2) # high tolerance necessary for Milvus 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["milvus1", "milvus", "faiss"], indirect=True) +def test_update_existing_docs(document_store, retriever): + document_store.duplicate_documents = "overwrite" + old_document = Document(content="text_1") + # initial write + document_store.write_documents([old_document]) + document_store.update_embeddings(retriever=retriever) + old_documents_indexed = document_store.get_all_documents(return_embedding=True) + assert len(old_documents_indexed) == 1 + + # Update document data + new_document = Document(content="text_2") + new_document.id = old_document.id + document_store.write_documents([new_document]) + document_store.update_embeddings(retriever=retriever) + new_documents_indexed = document_store.get_all_documents(return_embedding=True) + assert len(new_documents_indexed) == 1 + + assert old_documents_indexed[0].id == new_documents_indexed[0].id + assert old_documents_indexed[0].content == "text_1" + assert new_documents_indexed[0].content == "text_2" + print(type(old_documents_indexed[0].embedding)) + print(type(new_documents_indexed[0].embedding)) + assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_update_with_empty_store(document_store, retriever): + # Call update with empty doc store + document_store.update_embeddings(retriever=retriever) + + # initial write + document_store.write_documents(DOCUMENTS) + + documents_indexed = document_store.get_all_documents() + + assert len(documents_indexed) == len(DOCUMENTS) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["embedding"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_finding(document_store, retriever): + document_store.write_documents(DOCUMENTS) + pipe = DocumentSearchPipeline(retriever=retriever) + + prediction = pipe.run(query="How to test this?", params={"Retriever": {"top_k": 1}}) + + assert len(prediction.get("documents", [])) == 1 + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_delete_docs_with_filters_multivalue(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + document_store.delete_documents(filters={"name": ["name_1", "name_2", "name_3", "name_4"]}) + + documents = document_store.get_all_documents() + assert len(documents) == 2 + assert document_store.get_embedding_count() == 2 + assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"} + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_delete_docs_with_filters(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + document_store.delete_documents(filters={"year": ["2020"]}) + + documents = document_store.get_all_documents() + assert len(documents) == 3 + assert document_store.get_embedding_count() == 3 + assert all("2021" == doc.meta["year"] for doc in documents) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_delete_docs_with_many_filters(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + document_store.delete_documents(filters={"month": ["01"], "year": ["2020"]}) + + documents = document_store.get_all_documents() + assert len(documents) == 5 + assert document_store.get_embedding_count() == 5 + assert "name_1" not in {doc.meta["name"] for doc in documents} + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_delete_docs_by_id(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + doc_ids = [doc.id for doc in document_store.get_all_documents()] + ids_to_delete = doc_ids[0:3] + + document_store.delete_documents(ids=ids_to_delete) + + documents = document_store.get_all_documents() + assert len(documents) == len(doc_ids) - len(ids_to_delete) + assert document_store.get_embedding_count() == len(doc_ids) - len(ids_to_delete) + + remaining_ids = [doc.id for doc in documents] + assert all(doc_id not in remaining_ids for doc_id in ids_to_delete) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_delete_docs_by_id_with_filters(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + ids_to_delete = [doc.id for doc in document_store.get_all_documents(filters={"name": ["name_1", "name_2"]})] + ids_not_to_delete = [ + doc.id for doc in document_store.get_all_documents(filters={"name": ["name_3", "name_4", "name_5", "name_6"]}) + ] + + document_store.delete_documents(ids=ids_to_delete, filters={"name": ["name_1", "name_2", "name_3", "name_4"]}) + + documents = document_store.get_all_documents() + assert len(documents) == len(DOCUMENTS) - len(ids_to_delete) + assert document_store.get_embedding_count() == len(DOCUMENTS) - len(ids_to_delete) + + assert all(doc.meta["name"] != "name_1" for doc in documents) + assert all(doc.meta["name"] != "name_2" for doc in documents) + + all_ids_left = [doc.id for doc in documents] + assert all(doc_id in all_ids_left for doc_id in ids_not_to_delete) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_get_docs_with_filters_one_value(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + documents = document_store.get_all_documents(filters={"year": ["2020"]}) + + assert len(documents) == 3 + assert all("2020" == doc.meta["year"] for doc in documents) + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_get_docs_with_filters_many_values(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + documents = document_store.get_all_documents(filters={"name": ["name_5", "name_6"]}) + + assert len(documents) == 2 + assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"} + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_get_docs_with_many_filters(document_store, retriever): + document_store.write_documents(DOCUMENTS) + document_store.update_embeddings(retriever=retriever, batch_size=4) + assert document_store.get_embedding_count() == 6 + + documents = document_store.get_all_documents(filters={"month": ["01"], "year": ["2020"]}) + + assert len(documents) == 1 + assert "name_1" == documents[0].meta["name"] + assert "01" == documents[0].meta["month"] + assert "2020" == documents[0].meta["year"] + + +@pytest.mark.integration +@pytest.mark.parametrize("retriever", ["embedding"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True) +def test_pipeline(document_store, retriever): + documents = [ + {"name": "name_1", "content": "text_1", "embedding": np.random.rand(768).astype(np.float32)}, + {"name": "name_2", "content": "text_2", "embedding": np.random.rand(768).astype(np.float32)}, + {"name": "name_3", "content": "text_3", "embedding": np.random.rand(768).astype(np.float64)}, + {"name": "name_4", "content": "text_4", "embedding": np.random.rand(768).astype(np.float32)}, + ] + document_store.write_documents(documents) + pipeline = Pipeline() + pipeline.add_node(component=retriever, name="FAISS", inputs=["Query"]) + output = pipeline.run(query="How to test this?", params={"FAISS": {"top_k": 3}}) + assert len(output["documents"]) == 3