diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ce0766ceb..63dcd523a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: pip install pytest pip install -r requirements.txt pip install -e . - + - name: Run Pytest without generator/pipeline marker run: cd test && pytest -m "not pipeline and not generator" diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index bbd4600f5..82049483f 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -111,21 +111,18 @@ class ElasticsearchDocumentStore(BaseDocumentStore): self.custom_mapping = custom_mapping self.index: str = index self.label_index: str = label_index + if similarity in ["cosine", "dot_product"]: + self.similarity = similarity + else: + raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between 'cosine' and 'dot_product'") if create_index: self._create_document_index(index) self._create_label_index(label_index) self.update_existing_documents = update_existing_documents self.refresh_type = refresh_type - self.similarity = similarity - if similarity == "cosine": - self.similarity_fn_name = "cosineSimilarity" - elif similarity == "dot_product": - self.similarity_fn_name = "dotProduct" - else: - raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between \'cosine\' and \'dot_product\'") - def _create_document_index(self, index_name): + def _create_document_index(self, index_name: str): """ Create a new index for storing documents. In case if an index with the name already exists, it ensures that the embedding_field is present. @@ -182,7 +179,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore): if not self.client.indices.exists(index=index_name): raise e - def _create_label_index(self, index_name): + def _create_label_index(self, index_name: str): if self.client.indices.exists(index=index_name): return mapping = { @@ -531,22 +528,10 @@ class ElasticsearchDocumentStore(BaseDocumentStore): raise RuntimeError("Please specify arg `embedding_field` in ElasticsearchDocumentStore()") else: # +1 in similarity to avoid negative numbers (for cosine sim) - body= { + body = { "size": top_k, - "query": { - "script_score": { - "query": {"match_all": {}}, - "script": { - # offset score to ensure a positive range as required by Elasticsearch - "source": f"{self.similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000", - "params": { - "query_vector": query_emb.tolist() - } - } - } - } - } # type: Dict[str,Any] - + "query": self._get_vector_similarity_query(query_emb, top_k) + } if filters: for key, values in filters.items(): if type(values) != list: @@ -580,6 +565,29 @@ class ElasticsearchDocumentStore(BaseDocumentStore): ] return documents + def _get_vector_similarity_query(self, query_emb: np.array, top_k: int): + """ + Generate Elasticsearch query for vector similarity. + """ + if self.similarity == "cosine": + similarity_fn_name = "cosineSimilarity" + elif self.similarity == "dot_product": + similarity_fn_name = "dotProduct" + else: + raise Exception("Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between \'cosine\' and \'dot_product\'") + + query = { + "script_score": { + "query": {"match_all": {}}, + "script": { + # offset score to ensure a positive range as required by Elasticsearch + "source": f"{similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000", + "params": {"query_vector": query_emb.tolist()}, + }, + } + } + return query + def _convert_es_hit_to_document( self, hit: dict, @@ -596,7 +604,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore): score = hit["_score"] if hit["_score"] else None if score: if adapt_score_for_embedding: - score -= 1000 + score = self._scale_embedding_score(score) if self.similarity == "cosine": probability = (score + 1) / 2 # scaling probability from cosine similarity elif self.similarity == "dot_product": @@ -623,6 +631,9 @@ class ElasticsearchDocumentStore(BaseDocumentStore): ) return document + def _scale_embedding_score(self, score): + return score - 1000 + def describe_documents(self, index=None): """ Return a summary of the documents in the document store @@ -717,7 +728,78 @@ class ElasticsearchDocumentStore(BaseDocumentStore): time.sleep(1) +class OpenDistroElasticsearchDocumentStore(ElasticsearchDocumentStore): + """ + Document Store using the Open Distro for Elasticsearch. It is compatible with the AWS Elasticsearch Service. + In addition to native Elasticsearch query & filtering, it provides efficient vector similarity search using + the KNN plugin that can scale to a large number of documents. + """ + def _create_document_index(self, index_name: str): + """ + Create a new index for storing documents. + """ + if self.custom_mapping: + mapping = self.custom_mapping + else: + mapping = { + "mappings": { + "properties": { + self.name_field: {"type": "keyword"}, + self.text_field: {"type": "text"}, + }, + "dynamic_templates": [ + { + "strings": { + "path_match": "*", + "match_mapping_type": "string", + "mapping": {"type": "keyword"}}} + ], + }, + "settings": { + "analysis": { + "analyzer": { + "default": { + "type": self.analyzer, + } + } + } + } + } + if self.embedding_field: + if self.similarity == "cosine": + similarity_space_type = "cosinesimil" + elif self.similarity == "dot_product": + similarity_space_type = "l2" + else: + raise Exception( + f"Similarity function {self.similarity} is not supported by OpenDistroElasticsearchDocumentStore." + ) + mapping["settings"]["knn"] = True + mapping["settings"]["knn.space_type"] = similarity_space_type + mapping["mappings"]["properties"][self.embedding_field] = { + "type": "knn_vector", + "dimension": self.embedding_dim, + } + try: + self.client.indices.create(index=index_name, body=mapping) + except RequestError as e: + # With multiple workers we need to avoid race conditions, where: + # - there's no index in the beginning + # - both want to create one + # - one fails as the other one already created it + if not self.client.indices.exists(index=index_name): + raise e + + def _get_vector_similarity_query(self, query_emb: np.array, top_k: int): + """ + Generate Elasticsearch query for vector similarity. + """ + query = {"knn": {self.embedding_field: {"vector": query_emb.tolist(), "k": top_k}}} + return query + + def _scale_embedding_score(self, score): + return score \ No newline at end of file diff --git a/haystack/retriever/base.py b/haystack/retriever/base.py index bfcd47748..457ae017b 100644 --- a/haystack/retriever/base.py +++ b/haystack/retriever/base.py @@ -179,7 +179,8 @@ class BaseRetriever(ABC): documents = self.retrieve(query=query, filters=filters, top_k=top_k_retriever) else: documents = self.retrieve(query=query, filters=filters) - + document_ids = [doc.id for doc in documents] + logger.debug(f"Retrieved documents with IDs: {document_ids}") output = { "query": query, "documents": documents,