mirror of
				https://github.com/deepset-ai/haystack.git
				synced 2025-10-30 17:29:29 +00:00 
			
		
		
		
	 f8e02310bf
			
		
	
	
		f8e02310bf
		
			
		
	
	
	
	
		
			
			* Remove BasePipeline and make a module for RayPipeline * Can load pipelines from yaml, plenty of issues left * Extract graph validation logic into _add_node_to_pipeline_graph & refactor load_from_config and add_node to use it * Fix pipeline tests * Move some tests out of test_pipeline.py and create MockDenseRetriever * myoy and pylint (silencing too-many-public-methods) * Fix issue found in some yaml files and in schema files * Fix paths to YAML and fix some typos in Ray * Fix eval tests * Simplify MockDenseRetriever * Fix Ray test * Accidentally pushed merge coinflict, fixed * Typo in schemas * Typo in _json_schema.py * Slightly reduce noisyness of version validation warnings * Fix version logs tests * Fix version logs tests again * remove seemingly unused file * Add check and test to avoid adding the same node to the pipeline twice * Update Documentation & Code Style * Revert config to pipeline_config * Remo0ve unused import * Complete reverting to pipeline_config * Some more stray config= * Update Documentation & Code Style * Feedback * Move back other_nodes tests into pipeline tests temporarily * Update Documentation & Code Style * Fixing tests * Update Documentation & Code Style * Fixing ray and standard pipeline tests * Rename colliding load() methods in dense retrievers and faiss * Update Documentation & Code Style * Fix mypy on ray.py as well * Add check for no root node * Fix tests to use load_from_directory and load_index * Try to workaround the disabled add_node of RayPipeline * Update Documentation & Code Style * Fix Ray test * Fix FAISS tests * Relax class check in _add_node_to_pipeline_graph * Update Documentation & Code Style * Try to fix mypy in ray.py * unused import * Try another fix for Ray * Fix connector tests * Update Documentation & Code Style * Fix ray * Update Documentation & Code Style * use BaseComponent.load() in pipelines/base.py * another round of feedback * stray BaseComponent.load() * Update Documentation & Code Style * Fix FAISS tests too Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: tstadel <60758086+tstadel@users.noreply.github.com>
		
			
				
	
	
		
			607 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			607 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| import math
 | |
| from pathlib import Path
 | |
| 
 | |
| import yaml
 | |
| import faiss
 | |
| import pytest
 | |
| import numpy as np
 | |
| 
 | |
| from haystack.schema import Document
 | |
| from haystack.pipelines import DocumentSearchPipeline
 | |
| from haystack.document_stores.base import BaseDocumentStore
 | |
| from haystack.document_stores.faiss import FAISSDocumentStore
 | |
| 
 | |
| from haystack.pipelines import Pipeline
 | |
| from haystack.nodes.retriever.dense import EmbeddingRetriever
 | |
| 
 | |
| from .conftest import document_classifier, ensure_ids_are_correct_uuids, SAMPLES_PATH, MockDenseRetriever
 | |
| 
 | |
| 
 | |
| DOCUMENTS = [
 | |
|     {
 | |
|         "meta": {"name": "name_1", "year": "2020", "month": "01"},
 | |
|         "content": "text_1",
 | |
|         "embedding": np.random.rand(768).astype(np.float32),
 | |
|     },
 | |
|     {
 | |
|         "meta": {"name": "name_2", "year": "2020", "month": "02"},
 | |
|         "content": "text_2",
 | |
|         "embedding": np.random.rand(768).astype(np.float32),
 | |
|     },
 | |
|     {
 | |
|         "meta": {"name": "name_3", "year": "2020", "month": "03"},
 | |
|         "content": "text_3",
 | |
|         "embedding": np.random.rand(768).astype(np.float64),
 | |
|     },
 | |
|     {
 | |
|         "meta": {"name": "name_4", "year": "2021", "month": "01"},
 | |
|         "content": "text_4",
 | |
|         "embedding": np.random.rand(768).astype(np.float32),
 | |
|     },
 | |
|     {
 | |
|         "meta": {"name": "name_5", "year": "2021", "month": "02"},
 | |
|         "content": "text_5",
 | |
|         "embedding": np.random.rand(768).astype(np.float32),
 | |
|     },
 | |
|     {
 | |
|         "meta": {"name": "name_6", "year": "2021", "month": "03"},
 | |
|         "content": "text_6",
 | |
|         "embedding": np.random.rand(768).astype(np.float64),
 | |
|     },
 | |
| ]
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner")
 | |
| def test_faiss_index_save_and_load(tmp_path, sql_url):
 | |
|     document_store = FAISSDocumentStore(
 | |
|         sql_url=sql_url,
 | |
|         index="haystack_test",
 | |
|         progress_bar=False,  # Just to check if the init parameters are kept
 | |
|         isolation_level="AUTOCOMMIT",
 | |
|     )
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
| 
 | |
|     # test saving the index
 | |
|     document_store.save(tmp_path / "haystack_test_faiss")
 | |
| 
 | |
|     # clear existing faiss_index
 | |
|     document_store.faiss_indexes[document_store.index].reset()
 | |
| 
 | |
|     # test faiss index is cleared
 | |
|     assert document_store.faiss_indexes[document_store.index].ntotal == 0
 | |
| 
 | |
|     # test loading the index
 | |
|     new_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss")
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(new_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not new_document_store.progress_bar
 | |
| 
 | |
|     # test saving and loading the loaded faiss index
 | |
|     new_document_store.save(tmp_path / "haystack_test_faiss")
 | |
|     reloaded_document_store = FAISSDocumentStore.load(tmp_path / "haystack_test_faiss")
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not reloaded_document_store.progress_bar
 | |
| 
 | |
|     # test loading the index via init
 | |
|     new_document_store = FAISSDocumentStore(faiss_index_path=tmp_path / "haystack_test_faiss")
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(new_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not new_document_store.progress_bar
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner")
 | |
| def test_faiss_index_save_and_load_custom_path(tmp_path, sql_url):
 | |
|     document_store = FAISSDocumentStore(
 | |
|         sql_url=sql_url,
 | |
|         index="haystack_test",
 | |
|         progress_bar=False,  # Just to check if the init parameters are kept
 | |
|         isolation_level="AUTOCOMMIT",
 | |
|     )
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
| 
 | |
|     # test saving the index
 | |
|     document_store.save(index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json")
 | |
| 
 | |
|     # clear existing faiss_index
 | |
|     document_store.faiss_indexes[document_store.index].reset()
 | |
| 
 | |
|     # test faiss index is cleared
 | |
|     assert document_store.faiss_indexes[document_store.index].ntotal == 0
 | |
| 
 | |
|     # test loading the index
 | |
|     new_document_store = FAISSDocumentStore.load(
 | |
|         index_path=tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json"
 | |
|     )
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(new_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not new_document_store.progress_bar
 | |
| 
 | |
|     # test saving and loading the loaded faiss index
 | |
|     new_document_store.save(tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json")
 | |
|     reloaded_document_store = FAISSDocumentStore.load(
 | |
|         tmp_path / "haystack_test_faiss", config_path=tmp_path / "custom_path.json"
 | |
|     )
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert reloaded_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(reloaded_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not reloaded_document_store.progress_bar
 | |
| 
 | |
|     # test loading the index via init
 | |
|     new_document_store = FAISSDocumentStore(
 | |
|         faiss_index_path=tmp_path / "haystack_test_faiss", faiss_config_path=tmp_path / "custom_path.json"
 | |
|     )
 | |
| 
 | |
|     # check faiss index is restored
 | |
|     assert new_document_store.faiss_indexes[document_store.index].ntotal == len(DOCUMENTS)
 | |
|     # check if documents are restored
 | |
|     assert len(new_document_store.get_all_documents()) == len(DOCUMENTS)
 | |
|     # Check if the init parameters are kept
 | |
|     assert not new_document_store.progress_bar
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner")
 | |
| def test_faiss_index_mutual_exclusive_args(tmp_path):
 | |
|     with pytest.raises(ValueError):
 | |
|         FAISSDocumentStore(
 | |
|             sql_url=f"sqlite:////{tmp_path/'haystack_test.db'}",
 | |
|             faiss_index_path=f"{tmp_path/'haystack_test'}",
 | |
|             isolation_level="AUTOCOMMIT",
 | |
|         )
 | |
| 
 | |
|     with pytest.raises(ValueError):
 | |
|         FAISSDocumentStore(
 | |
|             f"sqlite:////{tmp_path/'haystack_test.db'}",
 | |
|             faiss_index_path=f"{tmp_path/'haystack_test'}",
 | |
|             isolation_level="AUTOCOMMIT",
 | |
|         )
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("document_store", ["faiss"], indirect=True)
 | |
| @pytest.mark.parametrize("index_buffer_size", [10_000, 2])
 | |
| @pytest.mark.parametrize("batch_size", [2])
 | |
| def test_faiss_write_docs(document_store, index_buffer_size, batch_size):
 | |
|     document_store.index_buffer_size = index_buffer_size
 | |
| 
 | |
|     # Write in small batches
 | |
|     for i in range(0, len(DOCUMENTS), batch_size):
 | |
|         document_store.write_documents(DOCUMENTS[i : i + batch_size])
 | |
| 
 | |
|     documents_indexed = document_store.get_all_documents()
 | |
|     assert len(documents_indexed) == len(DOCUMENTS)
 | |
| 
 | |
|     # test if correct vectors are associated with docs
 | |
|     for i, doc in enumerate(documents_indexed):
 | |
|         # we currently don't get the embeddings back when we call document_store.get_all_documents()
 | |
|         original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0]
 | |
|         stored_emb = document_store.faiss_indexes[document_store.index].reconstruct(int(doc.meta["vector_id"]))
 | |
|         # compare original input vec with stored one (ignore extra dim added by hnsw)
 | |
|         # original input vec is normalized as faiss only stores normalized vectors
 | |
|         assert np.allclose(original_doc["embedding"] / np.linalg.norm(original_doc["embedding"]), stored_emb, rtol=0.01)
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| @pytest.mark.parametrize("batch_size", [4, 6])
 | |
| def test_update_docs(document_store, retriever, batch_size):
 | |
|     # initial write
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
| 
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=batch_size)
 | |
|     documents_indexed = document_store.get_all_documents()
 | |
|     assert len(documents_indexed) == len(DOCUMENTS)
 | |
| 
 | |
|     # test if correct vectors are associated with docs
 | |
|     for doc in documents_indexed:
 | |
|         original_doc = [d for d in DOCUMENTS if d["content"] == doc.content][0]
 | |
|         updated_embedding = retriever.embed_documents([Document.from_dict(original_doc)])
 | |
|         stored_doc = document_store.get_all_documents(filters={"name": [doc.meta["name"]]})[0]
 | |
|         # compare original input vec with stored one (ignore extra dim added by hnsw)
 | |
|         # original input vec is normalized as faiss only stores normalized vectors
 | |
|         a = updated_embedding / np.linalg.norm(updated_embedding)
 | |
|         assert np.allclose(a[0], stored_doc.embedding, rtol=0.2)  # high tolerance necessary for Milvus 2
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["milvus1", "milvus", "faiss"], indirect=True)
 | |
| def test_update_existing_docs(document_store, retriever):
 | |
|     document_store.duplicate_documents = "overwrite"
 | |
|     old_document = Document(content="text_1")
 | |
|     # initial write
 | |
|     document_store.write_documents([old_document])
 | |
|     document_store.update_embeddings(retriever=retriever)
 | |
|     old_documents_indexed = document_store.get_all_documents(return_embedding=True)
 | |
|     assert len(old_documents_indexed) == 1
 | |
| 
 | |
|     # Update document data
 | |
|     new_document = Document(content="text_2")
 | |
|     new_document.id = old_document.id
 | |
|     document_store.write_documents([new_document])
 | |
|     document_store.update_embeddings(retriever=retriever)
 | |
|     new_documents_indexed = document_store.get_all_documents(return_embedding=True)
 | |
|     assert len(new_documents_indexed) == 1
 | |
| 
 | |
|     assert old_documents_indexed[0].id == new_documents_indexed[0].id
 | |
|     assert old_documents_indexed[0].content == "text_1"
 | |
|     assert new_documents_indexed[0].content == "text_2"
 | |
|     print(type(old_documents_indexed[0].embedding))
 | |
|     print(type(new_documents_indexed[0].embedding))
 | |
|     assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_update_with_empty_store(document_store, retriever):
 | |
|     # Call update with empty doc store
 | |
|     document_store.update_embeddings(retriever=retriever)
 | |
| 
 | |
|     # initial write
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
| 
 | |
|     documents_indexed = document_store.get_all_documents()
 | |
| 
 | |
|     assert len(documents_indexed) == len(DOCUMENTS)
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner")
 | |
| @pytest.mark.parametrize("index_factory", ["Flat", "HNSW", "IVF1,Flat"])
 | |
| def test_faiss_retrieving(index_factory, tmp_path):
 | |
|     document_store = FAISSDocumentStore(
 | |
|         sql_url=f"sqlite:////{tmp_path/'test_faiss_retrieving.db'}",
 | |
|         faiss_index_factory_str=index_factory,
 | |
|         isolation_level="AUTOCOMMIT",
 | |
|     )
 | |
| 
 | |
|     document_store.delete_all_documents(index="document")
 | |
|     if "ivf" in index_factory.lower():
 | |
|         document_store.train_index(DOCUMENTS)
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
| 
 | |
|     retriever = EmbeddingRetriever(
 | |
|         document_store=document_store, embedding_model="deepset/sentence_bert", use_gpu=False
 | |
|     )
 | |
|     result = retriever.retrieve(query="How to test this?")
 | |
| 
 | |
|     assert len(result) == len(DOCUMENTS)
 | |
|     assert type(result[0]) == Document
 | |
| 
 | |
|     # Cleanup
 | |
|     document_store.faiss_indexes[document_store.index].reset()
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("retriever", ["embedding"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_finding(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     pipe = DocumentSearchPipeline(retriever=retriever)
 | |
| 
 | |
|     prediction = pipe.run(query="How to test this?", params={"Retriever": {"top_k": 1}})
 | |
| 
 | |
|     assert len(prediction.get("documents", [])) == 1
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_delete_docs_with_filters(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     document_store.delete_documents(filters={"name": ["name_1", "name_2", "name_3", "name_4"]})
 | |
| 
 | |
|     documents = document_store.get_all_documents()
 | |
|     assert len(documents) == 2
 | |
|     assert document_store.get_embedding_count() == 2
 | |
|     assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"}
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_delete_docs_with_filters(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     document_store.delete_documents(filters={"year": ["2020"]})
 | |
| 
 | |
|     documents = document_store.get_all_documents()
 | |
|     assert len(documents) == 3
 | |
|     assert document_store.get_embedding_count() == 3
 | |
|     assert all("2021" == doc.meta["year"] for doc in documents)
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_delete_docs_with_many_filters(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     document_store.delete_documents(filters={"month": ["01"], "year": ["2020"]})
 | |
| 
 | |
|     documents = document_store.get_all_documents()
 | |
|     assert len(documents) == 5
 | |
|     assert document_store.get_embedding_count() == 5
 | |
|     assert "name_1" not in {doc.meta["name"] for doc in documents}
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_delete_docs_by_id(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
|     doc_ids = [doc.id for doc in document_store.get_all_documents()]
 | |
|     ids_to_delete = doc_ids[0:3]
 | |
| 
 | |
|     document_store.delete_documents(ids=ids_to_delete)
 | |
| 
 | |
|     documents = document_store.get_all_documents()
 | |
|     assert len(documents) == len(doc_ids) - len(ids_to_delete)
 | |
|     assert document_store.get_embedding_count() == len(doc_ids) - len(ids_to_delete)
 | |
| 
 | |
|     remaining_ids = [doc.id for doc in documents]
 | |
|     assert all(doc_id not in remaining_ids for doc_id in ids_to_delete)
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_delete_docs_by_id_with_filters(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     ids_to_delete = [doc.id for doc in document_store.get_all_documents(filters={"name": ["name_1", "name_2"]})]
 | |
|     ids_not_to_delete = [
 | |
|         doc.id for doc in document_store.get_all_documents(filters={"name": ["name_3", "name_4", "name_5", "name_6"]})
 | |
|     ]
 | |
| 
 | |
|     document_store.delete_documents(ids=ids_to_delete, filters={"name": ["name_1", "name_2", "name_3", "name_4"]})
 | |
| 
 | |
|     documents = document_store.get_all_documents()
 | |
|     assert len(documents) == len(DOCUMENTS) - len(ids_to_delete)
 | |
|     assert document_store.get_embedding_count() == len(DOCUMENTS) - len(ids_to_delete)
 | |
| 
 | |
|     assert all(doc.meta["name"] != "name_1" for doc in documents)
 | |
|     assert all(doc.meta["name"] != "name_2" for doc in documents)
 | |
| 
 | |
|     all_ids_left = [doc.id for doc in documents]
 | |
|     assert all(doc_id in all_ids_left for doc_id in ids_not_to_delete)
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_get_docs_with_filters_one_value(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     documents = document_store.get_all_documents(filters={"year": ["2020"]})
 | |
| 
 | |
|     assert len(documents) == 3
 | |
|     assert all("2020" == doc.meta["year"] for doc in documents)
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_get_docs_with_filters_many_values(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     documents = document_store.get_all_documents(filters={"name": ["name_5", "name_6"]})
 | |
| 
 | |
|     assert len(documents) == 2
 | |
|     assert {doc.meta["name"] for doc in documents} == {"name_5", "name_6"}
 | |
| 
 | |
| 
 | |
| @pytest.mark.slow
 | |
| @pytest.mark.parametrize("retriever", ["dpr"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_get_docs_with_many_filters(document_store, retriever):
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, batch_size=4)
 | |
|     assert document_store.get_embedding_count() == 6
 | |
| 
 | |
|     documents = document_store.get_all_documents(filters={"month": ["01"], "year": ["2020"]})
 | |
| 
 | |
|     assert len(documents) == 1
 | |
|     assert "name_1" == documents[0].meta["name"]
 | |
|     assert "01" == documents[0].meta["month"]
 | |
|     assert "2020" == documents[0].meta["year"]
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("retriever", ["embedding"], indirect=True)
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_pipeline(document_store, retriever):
 | |
|     documents = [
 | |
|         {"name": "name_1", "content": "text_1", "embedding": np.random.rand(768).astype(np.float32)},
 | |
|         {"name": "name_2", "content": "text_2", "embedding": np.random.rand(768).astype(np.float32)},
 | |
|         {"name": "name_3", "content": "text_3", "embedding": np.random.rand(768).astype(np.float64)},
 | |
|         {"name": "name_4", "content": "text_4", "embedding": np.random.rand(768).astype(np.float32)},
 | |
|     ]
 | |
|     document_store.write_documents(documents)
 | |
|     pipeline = Pipeline()
 | |
|     pipeline.add_node(component=retriever, name="FAISS", inputs=["Query"])
 | |
|     output = pipeline.run(query="How to test this?", params={"FAISS": {"top_k": 3}})
 | |
|     assert len(output["documents"]) == 3
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(sys.platform in ["win32", "cygwin"], reason="Test with tmp_path not working on windows runner")
 | |
| def test_faiss_passing_index_from_outside(tmp_path):
 | |
|     d = 768
 | |
|     nlist = 2
 | |
|     quantizer = faiss.IndexFlatIP(d)
 | |
|     index = "haystack_test_1"
 | |
|     faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_INNER_PRODUCT)
 | |
|     faiss_index.set_direct_map_type(faiss.DirectMap.Hashtable)
 | |
|     faiss_index.nprobe = 2
 | |
|     document_store = FAISSDocumentStore(
 | |
|         sql_url=f"sqlite:////{tmp_path/'haystack_test_faiss.db'}",
 | |
|         faiss_index=faiss_index,
 | |
|         index=index,
 | |
|         isolation_level="AUTOCOMMIT",
 | |
|     )
 | |
| 
 | |
|     document_store.delete_documents()
 | |
|     # as it is a IVF index we need to train it before adding docs
 | |
|     document_store.train_index(DOCUMENTS)
 | |
| 
 | |
|     document_store.write_documents(documents=DOCUMENTS)
 | |
|     documents_indexed = document_store.get_all_documents()
 | |
| 
 | |
|     # test if vectors ids are associated with docs
 | |
|     for doc in documents_indexed:
 | |
|         assert 0 <= int(doc.meta["vector_id"]) <= 7
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("document_store", ["faiss", "milvus1", "milvus", "weaviate"], indirect=True)
 | |
| def test_cosine_similarity(document_store):
 | |
|     # below we will write documents to the store and then query it to see if vectors were normalized
 | |
|     ensure_ids_are_correct_uuids(docs=DOCUMENTS, document_store=document_store)
 | |
|     document_store.write_documents(documents=DOCUMENTS)
 | |
| 
 | |
|     # note that the same query will be used later when querying after updating the embeddings
 | |
|     query = np.random.rand(768).astype(np.float32)
 | |
| 
 | |
|     query_results = document_store.query_by_embedding(query_emb=query, top_k=len(DOCUMENTS), return_embedding=True)
 | |
| 
 | |
|     # check if search with cosine similarity returns the correct number of results
 | |
|     assert len(query_results) == len(DOCUMENTS)
 | |
|     indexed_docs = {}
 | |
|     for doc in DOCUMENTS:
 | |
|         indexed_docs[doc["content"]] = doc["embedding"]
 | |
|         indexed_docs[doc["content"]] /= np.linalg.norm(doc["embedding"])
 | |
| 
 | |
|     for doc in query_results:
 | |
|         result_emb = doc.embedding
 | |
|         original_emb = indexed_docs[doc.content].astype("float32")
 | |
| 
 | |
|         # check if the stored embedding was normalized
 | |
|         np.testing.assert_allclose(
 | |
|             original_emb, result_emb, rtol=0.2, atol=5e-07
 | |
|         )  # high tolerance necessary for Milvus 2
 | |
| 
 | |
|         # check if the score is plausible for cosine similarity
 | |
|         assert 0 <= doc.score <= 1.0
 | |
| 
 | |
|     # now check if vectors are normalized when updating embeddings
 | |
|     class MockRetriever:
 | |
|         def embed_documents(self, docs):
 | |
|             return [np.random.rand(768).astype(np.float32) for doc in docs]
 | |
| 
 | |
|     retriever = MockRetriever()
 | |
|     document_store.update_embeddings(retriever=retriever)
 | |
|     query_results = document_store.query_by_embedding(query_emb=query, top_k=len(DOCUMENTS), return_embedding=True)
 | |
| 
 | |
|     for doc in query_results:
 | |
|         original_emb = np.array([indexed_docs[doc.content]], dtype="float32")
 | |
|         document_store.normalize_embedding(original_emb[0])
 | |
|         # check if the original embedding has changed after updating the embeddings
 | |
|         assert not np.allclose(original_emb[0], doc.embedding, rtol=0.01)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("document_store_dot_product_small", ["faiss", "milvus1", "milvus"], indirect=True)
 | |
| def test_normalize_embeddings_diff_shapes(document_store_dot_product_small):
 | |
|     VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32")
 | |
|     document_store_dot_product_small.normalize_embedding(VEC_1)
 | |
|     assert np.linalg.norm(VEC_1) - 1 < 0.01
 | |
| 
 | |
|     VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32").reshape(1, -1)
 | |
|     document_store_dot_product_small.normalize_embedding(VEC_1)
 | |
|     assert np.linalg.norm(VEC_1) - 1 < 0.01
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("document_store_small", ["faiss", "milvus1", "milvus", "weaviate"], indirect=True)
 | |
| def test_cosine_sanity_check(document_store_small):
 | |
|     VEC_1 = np.array([0.1, 0.2, 0.3], dtype="float32")
 | |
|     VEC_2 = np.array([0.4, 0.5, 0.6], dtype="float32")
 | |
| 
 | |
|     # This is the cosine similarity of VEC_1 and VEC_2 calculated using sklearn.metrics.pairwise.cosine_similarity
 | |
|     # The score is normalized to yield a value between 0 and 1.
 | |
|     KNOWN_COSINE = 0.9746317
 | |
|     KNOWN_SCALED_COSINE = (KNOWN_COSINE + 1) / 2
 | |
| 
 | |
|     docs = [{"name": "vec_1", "text": "vec_1", "content": "vec_1", "embedding": VEC_1}]
 | |
|     ensure_ids_are_correct_uuids(docs=docs, document_store=document_store_small)
 | |
|     document_store_small.write_documents(documents=docs)
 | |
| 
 | |
|     query_results = document_store_small.query_by_embedding(
 | |
|         query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=True
 | |
|     )
 | |
| 
 | |
|     # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318
 | |
|     assert math.isclose(query_results[0].score, KNOWN_SCALED_COSINE, abs_tol=0.00002)
 | |
| 
 | |
|     query_results = document_store_small.query_by_embedding(
 | |
|         query_emb=VEC_2, top_k=1, return_embedding=True, scale_score=False
 | |
|     )
 | |
| 
 | |
|     # check if faiss returns the same cosine similarity. Manual testing with faiss yielded 0.9746318
 | |
|     assert math.isclose(query_results[0].score, KNOWN_COSINE, abs_tol=0.00002)
 | |
| 
 | |
| 
 | |
| @pytest.mark.integration
 | |
| def test_pipeline_with_existing_faiss_docstore(tmp_path):
 | |
| 
 | |
|     document_store: FAISSDocumentStore = FAISSDocumentStore(
 | |
|         sql_url=f'sqlite:///{(tmp_path / "faiss_document_store.db").absolute()}'
 | |
|     )
 | |
|     retriever = MockDenseRetriever(document_store=document_store)
 | |
|     document_store.write_documents(DOCUMENTS)
 | |
|     document_store.update_embeddings(retriever=retriever, update_existing_embeddings=True)
 | |
| 
 | |
|     document_store.save(tmp_path / "existing_faiss_document_store")
 | |
| 
 | |
|     query_config = f"""
 | |
| version: ignore
 | |
| components:
 | |
|   - name: DPRRetriever
 | |
|     type: MockDenseRetriever
 | |
|     params:
 | |
|       document_store: ExistingFAISSDocumentStore
 | |
|   - name: ExistingFAISSDocumentStore
 | |
|     type: FAISSDocumentStore
 | |
|     params:
 | |
|       faiss_index_path: '{tmp_path / "existing_faiss_document_store"}'
 | |
| pipelines:
 | |
|   - name: query_pipeline
 | |
|     nodes:
 | |
|       - name: DPRRetriever
 | |
|         inputs: [Query]
 | |
|     """
 | |
|     pipeline = Pipeline.load_from_config(yaml.safe_load(query_config))
 | |
|     existing_document_store = pipeline.get_document_store()
 | |
|     faiss_index = existing_document_store.faiss_indexes["document"]
 | |
|     assert faiss_index.ntotal == len(DOCUMENTS)
 |