Add refresh_type arg to ElasticsearchDocumentStore (#326)

* Added refresh type into ElasticsearchDocumentStore

* Update docstring

Co-authored-by: Malte Pietsch <malte.pietsch@deepset.ai>
This commit is contained in:
antoniolanza1996 2020-08-21 09:36:04 +02:00 committed by GitHub
parent 7d2a8f19fc
commit 85c743fe2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -36,6 +36,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
verify_certs: bool = True,
create_index: bool = True,
update_existing_documents: bool = False,
refresh_type: str = "wait_for",
):
"""
A DocumentStore using Elasticsearch to store and query the documents for our search.
@ -66,6 +67,11 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
documents. When set as True, any document with an existing ID gets updated.
If set to False, an error is raised if the document ID of the document being
added already exists.
:param refresh_type: Type of ES refresh used to control when changes made by a request (e.g. bulk) are made visible to search.
Values:
- 'wait_for' => continue only after changes are visible (slow, but safe)
- 'false' => continue directly (fast, but sometimes unintuitive behaviour when docs are not immediately available after indexing)
More info at https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-refresh.html
"""
self.client = Elasticsearch(hosts=[{"host": host, "port": port}], http_auth=(username, password),
scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs)
@ -92,6 +98,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
self._create_label_index(label_index)
self.label_index: str = label_index
self.update_existing_documents = update_existing_documents
self.refresh_type = refresh_type
def _create_document_index(self, index_name):
if self.client.indices.exists(index=index_name):
@ -213,7 +220,7 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
_doc[k] = v
_doc.pop("meta")
documents_to_index.append(_doc)
bulk(self.client, documents_to_index, request_timeout=300, refresh="wait_for")
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type)
def write_labels(self, labels: Union[List[Label], List[dict]], index: Optional[str] = None):
index = index or self.label_index
@ -232,11 +239,11 @@ class ElasticsearchDocumentStore(BaseDocumentStore):
} # type: Dict[str, Any]
labels_to_index.append(_label)
bulk(self.client, labels_to_index, request_timeout=300, refresh="wait_for")
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type)
def update_document_meta(self, id: str, meta: Dict[str, str]):
body = {"doc": meta}
self.client.update(index=self.index, doc_type="_doc", id=id, body=body, refresh="wait_for")
self.client.update(index=self.index, doc_type="_doc", id=id, body=body, refresh=self.refresh_type)
def get_document_count(self, index: Optional[str] = None) -> int:
if index is None: