From 85c743fe2ac58108b9fdb7757ea4b4a975c8723c Mon Sep 17 00:00:00 2001 From: antoniolanza1996 <40452030+antoniolanza1996@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:36:04 +0200 Subject: [PATCH] Add refresh_type arg to ElasticsearchDocumentStore (#326) * Added refresh type into ElasticsearchDocumentStore * Update docstring Co-authored-by: Malte Pietsch --- haystack/database/elasticsearch.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/haystack/database/elasticsearch.py b/haystack/database/elasticsearch.py index 35c2216d7..4e7993ac8 100644 --- a/haystack/database/elasticsearch.py +++ b/haystack/database/elasticsearch.py @@ -36,6 +36,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore): verify_certs: bool = True, create_index: bool = True, update_existing_documents: bool = False, + refresh_type: str = "wait_for", ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. @@ -66,6 +67,11 @@ class ElasticsearchDocumentStore(BaseDocumentStore): documents. When set as True, any document with an existing ID gets updated. If set to False, an error is raised if the document ID of the document being added already exists. + :param refresh_type: Type of ES refresh used to control when changes made by a request (e.g. bulk) are made visible to search. + Values: + - 'wait_for' => continue only after changes are visible (slow, but safe) + - 'false' => continue directly (fast, but sometimes unintuitive behaviour when docs are not immediately available after indexing) + More info at https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-refresh.html """ self.client = Elasticsearch(hosts=[{"host": host, "port": port}], http_auth=(username, password), scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs) @@ -92,6 +98,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore): self._create_label_index(label_index) self.label_index: str = label_index self.update_existing_documents = update_existing_documents + self.refresh_type = refresh_type def _create_document_index(self, index_name): if self.client.indices.exists(index=index_name): @@ -213,7 +220,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore): _doc[k] = v _doc.pop("meta") documents_to_index.append(_doc) - bulk(self.client, documents_to_index, request_timeout=300, refresh="wait_for") + bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type) def write_labels(self, labels: Union[List[Label], List[dict]], index: Optional[str] = None): index = index or self.label_index @@ -232,11 +239,11 @@ class ElasticsearchDocumentStore(BaseDocumentStore): } # type: Dict[str, Any] labels_to_index.append(_label) - bulk(self.client, labels_to_index, request_timeout=300, refresh="wait_for") + bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type) def update_document_meta(self, id: str, meta: Dict[str, str]): body = {"doc": meta} - self.client.update(index=self.index, doc_type="_doc", id=id, body=body, refresh="wait_for") + self.client.update(index=self.index, doc_type="_doc", id=id, body=body, refresh=self.refresh_type) def get_document_count(self, index: Optional[str] = None) -> int: if index is None: