Facilitate concurrent query / indexing in Elasticsearch with dense retrievers (new skip_missing_embeddings param) (#1762)

* Filtering records not having embeddings

* Added support for skip_missing_embeddings Flag. Default behavior is throw error when embeddings are missing. If skip_missing_embeddings=True then documents without embeddings are ignored for vector similarity

* Fix for below error:
haystack/document_stores/elasticsearch.py:852: error: Need type annotation for "script_score_query"

* docstring for skip_missing_embeddings parameter

* Raise exception where no documents with embeddings is found for Embedding retriever.

* Default skip_missing_embeddings to True

* Explicitly check if embeddings are present if no results are returned by EmbeddingRetriever for Elasticsearch

* Added test case for based on Julian's input

* Added test case for based on Julian's input. Fix pytest error on the testcase

* Added test case for based on Julian's input. Fix pytest error on the testcase

* Added test case for based on Julian's input. Fix pytest error on the testcase

* Simplify code by using get_embed_count

* Adjust docstring & error msg slightly

* Revert error msg

Co-authored-by: Malte Pietsch <malte.pietsch@deepset.ai>
This commit is contained in:
C V Goudar 2021-11-19 19:20:23 +05:30 committed by GitHub
parent d81897535e
commit a9a379784a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 3 deletions

View File

@ -51,7 +51,8 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
return_embedding: bool = False,
duplicate_documents: str = 'overwrite',
index_type: str = "flat",
scroll: str = "1d"
scroll: str = "1d",
skip_missing_embeddings: bool = True
):
"""
A DocumentStore using Elasticsearch to store and query the documents for our search.
@ -104,6 +105,10 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
:param scroll: Determines how long the current index is fixed, e.g. during updating all documents with embeddings.
Defaults to "1d" and should not be larger than this. Can also be in minutes "5m" or hours "15h"
For details, see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
:param skip_missing_embeddings: Parameter to control queries based on vector similarity when indexed documents miss embeddings.
Parameter options: (True, False)
False: Raises exception if one or more documents do not have embeddings at query time
True: Query will ignore all documents without embeddings (recommended if you concurrently index and query)
"""
# save init parameters to enable export of component config as YAML
@ -114,7 +119,8 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
custom_mapping=custom_mapping, excluded_meta_data=excluded_meta_data, analyzer=analyzer, scheme=scheme,
ca_certs=ca_certs, verify_certs=verify_certs, create_index=create_index,
duplicate_documents=duplicate_documents, refresh_type=refresh_type, similarity=similarity,
timeout=timeout, return_embedding=return_embedding, index_type=index_type, scroll=scroll
timeout=timeout, return_embedding=return_embedding, index_type=index_type, scroll=scroll,
skip_missing_embeddings=skip_missing_embeddings
)
self.client = self._init_elastic_client(host=host, port=port, username=username, password=password,
@ -140,6 +146,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
self.index: str = index
self.label_index: str = label_index
self.scroll = scroll
self.skip_missing_embeddings: bool = skip_missing_embeddings
if similarity in ["cosine", "dot_product", "l2"]:
self.similarity = similarity
else:
@ -817,6 +824,11 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
logger.debug(f"Retriever query: {body}")
try:
result = self.client.search(index=index, body=body, request_timeout=300)["hits"]["hits"]
if len(result) == 0:
count_embeddings = self.get_embedding_count(index=index)
if count_embeddings == 0:
raise RequestError(400, "search_phase_execution_exception",
{"error": "No documents with embeddings."})
except RequestError as e:
if e.error == "search_phase_execution_exception":
error_message: str = "search_phase_execution_exception: Likely some of your stored documents don't have embeddings." \
@ -845,9 +857,28 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
else:
raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between \'cosine\' and \'dot_product\'")
# To handle scenarios where embeddings may be missing
script_score_query: dict = {"match_all": {}}
if self.skip_missing_embeddings:
script_score_query = {
"bool": {
"filter": {
"bool": {
"must": [
{
"exists": {
"field": self.embedding_field
}
}
]
}
}
}
}
query = {
"script_score": {
"query": {"match_all": {}},
"query": script_score_query,
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": f"{similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000",

View File

@ -2,6 +2,8 @@ import numpy as np
import pandas as pd
import pytest
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
from conftest import get_document_store
from haystack.document_stores import WeaviateDocumentStore
@ -883,3 +885,38 @@ def test_get_document_count_only_documents_without_embedding_arg():
filters={"meta_field_for_count": ["c"]}) == 1
assert document_store.get_document_count(only_documents_without_embedding=True,
filters={"meta_field_for_count": ["b"]}) == 2
@pytest.mark.elasticsearch
def test_skip_missing_embeddings():
documents = [
{"content": "text1", "id": "1"}, # a document without embeddings
{"content": "text2", "id": "2", "embedding": np.random.rand(768).astype(np.float64)},
{"content": "text3", "id": "3", "embedding": np.random.rand(768).astype(np.float32).tolist()},
{"content": "text4", "id": "4", "embedding": np.random.rand(768).astype(np.float32)}
]
document_store = ElasticsearchDocumentStore(index="skip_missing_embedding_index")
document_store.write_documents(documents)
document_store.skip_missing_embeddings = True
retrieved_docs = document_store.query_by_embedding(np.random.rand(768).astype(np.float32))
assert len(retrieved_docs) == 3
document_store.skip_missing_embeddings = False
with pytest.raises(RequestError):
document_store.query_by_embedding(np.random.rand(768).astype(np.float32))
# Test scenario with no embeddings for the entire index
documents = [
{"content": "text1", "id": "1"},
{"content": "text2", "id": "2"},
{"content": "text3", "id": "3"},
{"content": "text4", "id": "4"}
]
document_store.delete_documents()
document_store.write_documents(documents)
document_store.skip_missing_embeddings = True
with pytest.raises(RequestError):
document_store.query_by_embedding(np.random.rand(768).astype(np.float32))