feat: exponential backoff with exp decreasing batch size for opensearch client (#3194)

* Validate custom_mapping properly as an object

* Remove related test

* black

* feat: exponential backoff with exp dec batch size

* added docstring and split doc lsit

* fix

* fix mypy

* fix

* catch generic exception

* added test

* mypy ignore

* fixed no attribute

* added test

* added tests

* revert strange merge conflicts

* revert merge conflict again

* Update haystack/document_stores/elasticsearch.py

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>

* done

* adjust test

* remove not required caplog

* fixed comments

Co-authored-by: ZanSara <sarazanzo94@gmail.com>
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
This commit is contained in:
Kristof Herrmann 2022-09-13 14:30:30 +01:00 committed by GitHub
parent b47c93989b
commit da1cc577ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 6 deletions

View File

@ -130,6 +130,69 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore):
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type
def _split_document_list(
self, documents: Union[List[dict], List[Document]], number_of_lists: int
) -> Generator[Union[List[dict], List[Document]], None, None]:
chunk_size = max((len(documents) + 1) // number_of_lists, 1)
for i in range(0, len(documents), chunk_size):
yield documents[i : i + chunk_size]
def _bulk(
self,
documents: Union[List[dict], List[Document]],
headers: Optional[Dict[str, str]] = None,
request_timeout: int = 300,
refresh: str = "wait_for",
_timeout: int = 1,
_remaining_tries: int = 10,
) -> None:
"""
Bulk index documents into Elasticsearch using a custom retry implementation that uses
exponential backoff and exponential batch size reduction to avoid overloading the cluster.
Opensearch/elasticsearch returns '429 Too Many Requests' when the write requests can't be
processed because there are too many requests in the queue or the single request is too large and exceeds the
memory of the nodes. Since the error code is the same for both of these cases we need to wait
and reduce the batch size simultaneously.
:param documents: List of documents to index
:param headers: Optional headers to pass to the bulk request
:param request_timeout: Timeout for the bulk request
:param refresh: Refresh policy for the bulk request
:param _timeout: Timeout for the exponential backoff
:param _remaining_tries: Number of remaining retries
"""
try:
bulk(self.client, documents, request_timeout=300, refresh=self.refresh_type, headers=headers)
except Exception as e:
if hasattr(e, "status_code") and e.status_code == 429: # type: ignore
logger.warning(
f"Failed to insert a batch of '{len(documents)}' documents because of a 'Too Many Requeset' response. Splitting the number of documents into two chunks with the same size and retrying in {_timeout} seconds."
)
if len(documents) == 1:
logger.warning(
"Failed to index a single document. Your indexing queue on the cluster is probably full. Try resizing your cluster or reducing the number of parallel processes that are writing to the cluster."
)
time.sleep(_timeout)
_remaining_tries -= 1
if _remaining_tries == 0:
raise DocumentStoreError("Last try of bulk indexing documents failed.")
for split_docs in self._split_document_list(documents, 2):
self._bulk(
documents=split_docs,
headers=headers,
request_timeout=request_timeout,
refresh=refresh,
_timeout=_timeout * 2,
_remaining_tries=_remaining_tries,
)
return
raise e
def _create_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
"""
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
@ -442,11 +505,11 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore):
# Pass batch_size number of documents to bulk
if len(documents_to_index) % batch_size == 0:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
documents_to_index = []
if documents_to_index:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
def write_labels(
self,
@ -500,11 +563,11 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore):
# Pass batch_size number of labels to bulk
if len(labels_to_index) % batch_size == 0:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
labels_to_index = []
if labels_to_index:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
def update_document_meta(
self, id: str, meta: Dict[str, str], index: str = None, headers: Optional[Dict[str, str]] = None
@ -1434,7 +1497,7 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore):
}
doc_updates.append(update)
bulk(self.client, doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents=doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
progress_bar.update(batch_size)
def delete_all_documents(

View File

@ -1,10 +1,12 @@
import logging
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
import pytest
import numpy as np
import opensearchpy
from haystack.document_stores.opensearch import (
OpenSearch,
OpenSearchDocumentStore,
@ -807,6 +809,47 @@ class TestOpenSearchDocumentStore:
},
}
@pytest.mark.unit
def test_bulk_write_retries_for_always_failing_insert_is_canceled(self, mocked_document_store, monkeypatch, caplog):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]
with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
mocked_bulk.side_effect = opensearchpy.TransportError(429, "Too many requests")
with pytest.raises(DocumentStoreError, match="Last try of bulk indexing documents failed."):
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)
assert mocked_bulk.call_count == 3 # depth first search failes and cancels the whole bulk request
assert "Too Many Requeset" in caplog.text
assert " Splitting the number of documents into two chunks with the same size" in caplog.text
@pytest.mark.unit
def test_bulk_write_retries_with_backoff_with_smaller_batch_size_on_too_many_requests(
self, mocked_document_store, monkeypatch
):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]
with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
# make bulk insert split documents and request retries s.t.
# 1k => 500 (failed) + 500 (successful) => 250 (successful) + 250 (successful)
# resulting in 5 calls in total
mocked_bulk.side_effect = [
opensearchpy.TransportError(429, "Too many requests"),
opensearchpy.TransportError(429, "Too many requests"),
None,
None,
None,
]
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)
assert mocked_bulk.call_count == 5
class TestOpenDistroElasticsearchDocumentStore:
@pytest.mark.unit