Add DocumentStore for Open Distro Elasticsearch (#676)

This commit is contained in:
Tanay Soni 2020-12-15 09:28:40 +01:00 committed by GitHub
parent 33fe597949
commit 369e237fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 110 additions and 27 deletions

View File

@ -38,7 +38,7 @@ jobs:
pip install pytest
pip install -r requirements.txt
pip install -e .
- name: Run Pytest without generator/pipeline marker
run: cd test && pytest -m "not pipeline and not generator"

View File

@ -111,21 +111,18 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
self.custom_mapping = custom_mapping
self.index: str = index
self.label_index: str = label_index
if similarity in ["cosine", "dot_product"]:
self.similarity = similarity
else:
raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between 'cosine' and 'dot_product'")
if create_index:
self._create_document_index(index)
self._create_label_index(label_index)
self.update_existing_documents = update_existing_documents
self.refresh_type = refresh_type
self.similarity = similarity
if similarity == "cosine":
self.similarity_fn_name = "cosineSimilarity"
elif similarity == "dot_product":
self.similarity_fn_name = "dotProduct"
else:
raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between \'cosine\' and \'dot_product\'")
def _create_document_index(self, index_name):
def _create_document_index(self, index_name: str):
"""
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
the embedding_field is present.
@ -182,7 +179,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
if not self.client.indices.exists(index=index_name):
raise e
def _create_label_index(self, index_name):
def _create_label_index(self, index_name: str):
if self.client.indices.exists(index=index_name):
return
mapping = {
@ -531,22 +528,10 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
raise RuntimeError("Please specify arg `embedding_field` in ElasticsearchDocumentStore()")
else:
# +1 in similarity to avoid negative numbers (for cosine sim)
body= {
body = {
"size": top_k,
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": f"{self.similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000",
"params": {
"query_vector": query_emb.tolist()
}
}
}
}
} # type: Dict[str,Any]
"query": self._get_vector_similarity_query(query_emb, top_k)
}
if filters:
for key, values in filters.items():
if type(values) != list:
@ -580,6 +565,29 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
]
return documents
def _get_vector_similarity_query(self, query_emb: np.array, top_k: int):
"""
Generate Elasticsearch query for vector similarity.
"""
if self.similarity == "cosine":
similarity_fn_name = "cosineSimilarity"
elif self.similarity == "dot_product":
similarity_fn_name = "dotProduct"
else:
raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between \'cosine\' and \'dot_product\'")
query = {
"script_score": {
"query": {"match_all": {}},
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": f"{similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000",
"params": {"query_vector": query_emb.tolist()},
},
}
}
return query
def _convert_es_hit_to_document(
self,
hit: dict,
@ -596,7 +604,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
score = hit["_score"] if hit["_score"] else None
if score:
if adapt_score_for_embedding:
score -= 1000
score = self._scale_embedding_score(score)
if self.similarity == "cosine":
probability = (score + 1) / 2 # scaling probability from cosine similarity
elif self.similarity == "dot_product":
@ -623,6 +631,9 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
)
return document
def _scale_embedding_score(self, score):
return score - 1000
def describe_documents(self, index=None):
"""
Return a summary of the documents in the document store
@ -717,7 +728,78 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
time.sleep(1)
class OpenDistroElasticsearchDocumentStore(ElasticsearchDocumentStore):
"""
Document Store using the Open Distro for Elasticsearch. It is compatible with the AWS Elasticsearch Service.
In addition to native Elasticsearch query & filtering, it provides efficient vector similarity search using
the KNN plugin that can scale to a large number of documents.
"""
def _create_document_index(self, index_name: str):
"""
Create a new index for storing documents.
"""
if self.custom_mapping:
mapping = self.custom_mapping
else:
mapping = {
"mappings": {
"properties": {
self.name_field: {"type": "keyword"},
self.text_field: {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"path_match": "*",
"match_mapping_type": "string",
"mapping": {"type": "keyword"}}}
],
},
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": self.analyzer,
}
}
}
}
}
if self.embedding_field:
if self.similarity == "cosine":
similarity_space_type = "cosinesimil"
elif self.similarity == "dot_product":
similarity_space_type = "l2"
else:
raise Exception(
f"Similarity function {self.similarity} is not supported by OpenDistroElasticsearchDocumentStore."
)
mapping["settings"]["knn"] = True
mapping["settings"]["knn.space_type"] = similarity_space_type
mapping["mappings"]["properties"][self.embedding_field] = {
"type": "knn_vector",
"dimension": self.embedding_dim,
}
try:
self.client.indices.create(index=index_name, body=mapping)
except RequestError as e:
# With multiple workers we need to avoid race conditions, where:
# - there's no index in the beginning
# - both want to create one
# - one fails as the other one already created it
if not self.client.indices.exists(index=index_name):
raise e
def _get_vector_similarity_query(self, query_emb: np.array, top_k: int):
"""
Generate Elasticsearch query for vector similarity.
"""
query = {"knn": {self.embedding_field: {"vector": query_emb.tolist(), "k": top_k}}}
return query
def _scale_embedding_score(self, score):
return score

View File

@ -179,7 +179,8 @@ class BaseRetriever(ABC):
documents = self.retrieve(query=query, filters=filters, top_k=top_k_retriever)
else:
documents = self.retrieve(query=query, filters=filters)
document_ids = [doc.id for doc in documents]
logger.debug(f"Retrieved documents with IDs: {document_ids}")
output = {
"query": query,
"documents": documents,