From da1cc577ae0dcf63f3b4f4bd71bb972f5f1f825c Mon Sep 17 00:00:00 2001 From: Kristof Herrmann <37148029+ArzelaAscoIi@users.noreply.github.com> Date: Tue, 13 Sep 2022 14:30:30 +0100 Subject: [PATCH] feat: exponential backoff with exp decreasing batch size for opensearch client (#3194) * Validate custom_mapping properly as an object * Remove related test * black * feat: exponential backoff with exp dec batch size * added docstring and split doc lsit * fix * fix mypy * fix * catch generic exception * added test * mypy ignore * fixed no attribute * added test * added tests * revert strange merge conflicts * revert merge conflict again * Update haystack/document_stores/elasticsearch.py Co-authored-by: Massimiliano Pippi * done * adjust test * remove not required caplog * fixed comments Co-authored-by: ZanSara Co-authored-by: Massimiliano Pippi --- haystack/document_stores/elasticsearch.py | 73 +++++++++++++++++++++-- test/document_stores/test_opensearch.py | 45 +++++++++++++- 2 files changed, 112 insertions(+), 6 deletions(-) diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index 6ddfec29f..bc5963562 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -130,6 +130,69 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): self.duplicate_documents = duplicate_documents self.refresh_type = refresh_type + def _split_document_list( + self, documents: Union[List[dict], List[Document]], number_of_lists: int + ) -> Generator[Union[List[dict], List[Document]], None, None]: + chunk_size = max((len(documents) + 1) // number_of_lists, 1) + for i in range(0, len(documents), chunk_size): + yield documents[i : i + chunk_size] + + def _bulk( + self, + documents: Union[List[dict], List[Document]], + headers: Optional[Dict[str, str]] = None, + request_timeout: int = 300, + refresh: str = "wait_for", + _timeout: int = 1, + _remaining_tries: int = 10, + ) -> None: + """ + Bulk index documents into Elasticsearch using a custom retry implementation that uses + exponential backoff and exponential batch size reduction to avoid overloading the cluster. + + Opensearch/elasticsearch returns '429 Too Many Requests' when the write requests can't be + processed because there are too many requests in the queue or the single request is too large and exceeds the + memory of the nodes. Since the error code is the same for both of these cases we need to wait + and reduce the batch size simultaneously. + + :param documents: List of documents to index + :param headers: Optional headers to pass to the bulk request + :param request_timeout: Timeout for the bulk request + :param refresh: Refresh policy for the bulk request + :param _timeout: Timeout for the exponential backoff + :param _remaining_tries: Number of remaining retries + """ + + try: + bulk(self.client, documents, request_timeout=300, refresh=self.refresh_type, headers=headers) + except Exception as e: + if hasattr(e, "status_code") and e.status_code == 429: # type: ignore + logger.warning( + f"Failed to insert a batch of '{len(documents)}' documents because of a 'Too Many Requeset' response. Splitting the number of documents into two chunks with the same size and retrying in {_timeout} seconds." + ) + if len(documents) == 1: + logger.warning( + "Failed to index a single document. Your indexing queue on the cluster is probably full. Try resizing your cluster or reducing the number of parallel processes that are writing to the cluster." + ) + + time.sleep(_timeout) + + _remaining_tries -= 1 + if _remaining_tries == 0: + raise DocumentStoreError("Last try of bulk indexing documents failed.") + + for split_docs in self._split_document_list(documents, 2): + self._bulk( + documents=split_docs, + headers=headers, + request_timeout=request_timeout, + refresh=refresh, + _timeout=_timeout * 2, + _remaining_tries=_remaining_tries, + ) + return + raise e + def _create_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None): """ Create a new index for storing documents. In case if an index with the name already exists, it ensures that @@ -442,11 +505,11 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): # Pass batch_size number of documents to bulk if len(documents_to_index) % batch_size == 0: - bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) + self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) documents_to_index = [] if documents_to_index: - bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) + self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) def write_labels( self, @@ -500,11 +563,11 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): # Pass batch_size number of labels to bulk if len(labels_to_index) % batch_size == 0: - bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) + self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) labels_to_index = [] if labels_to_index: - bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) + self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers) def update_document_meta( self, id: str, meta: Dict[str, str], index: str = None, headers: Optional[Dict[str, str]] = None @@ -1434,7 +1497,7 @@ class BaseElasticsearchDocumentStore(KeywordDocumentStore): } doc_updates.append(update) - bulk(self.client, doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers) + self._bulk(documents=doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers) progress_bar.update(batch_size) def delete_all_documents( diff --git a/test/document_stores/test_opensearch.py b/test/document_stores/test_opensearch.py index 83a245b7d..ade035730 100644 --- a/test/document_stores/test_opensearch.py +++ b/test/document_stores/test_opensearch.py @@ -1,10 +1,12 @@ import logging -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest import numpy as np +import opensearchpy + from haystack.document_stores.opensearch import ( OpenSearch, OpenSearchDocumentStore, @@ -807,6 +809,47 @@ class TestOpenSearchDocumentStore: }, } + @pytest.mark.unit + def test_bulk_write_retries_for_always_failing_insert_is_canceled(self, mocked_document_store, monkeypatch, caplog): + docs_to_write = [ + {"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)} + for i in range(1000) + ] + + with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk: + mocked_bulk.side_effect = opensearchpy.TransportError(429, "Too many requests") + + with pytest.raises(DocumentStoreError, match="Last try of bulk indexing documents failed."): + mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3) + + assert mocked_bulk.call_count == 3 # depth first search failes and cancels the whole bulk request + + assert "Too Many Requeset" in caplog.text + assert " Splitting the number of documents into two chunks with the same size" in caplog.text + + @pytest.mark.unit + def test_bulk_write_retries_with_backoff_with_smaller_batch_size_on_too_many_requests( + self, mocked_document_store, monkeypatch + ): + docs_to_write = [ + {"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)} + for i in range(1000) + ] + + with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk: + # make bulk insert split documents and request retries s.t. + # 1k => 500 (failed) + 500 (successful) => 250 (successful) + 250 (successful) + # resulting in 5 calls in total + mocked_bulk.side_effect = [ + opensearchpy.TransportError(429, "Too many requests"), + opensearchpy.TransportError(429, "Too many requests"), + None, + None, + None, + ] + mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3) + assert mocked_bulk.call_count == 5 + class TestOpenDistroElasticsearchDocumentStore: @pytest.mark.unit