fix: remove warnings from the more recent Elasticsearch client (#4602)

* clean up the ES instance in a more robust way

* do not sleep, refresh the index instead

* remove client warnings

* fix unit tests

* fix opensearch compatibility

* fix unit tests

* update ES version

* bump elasticsearch-py

* adjust docs

* use recreate_index param

* use same fixture strategy for Opensearch

* Update lg

---------

Co-authored-by: agnieszka-m <amarzec13@gmail.com>
This commit is contained in:
Massimiliano Pippi 2023-04-18 15:40:17 +02:00 committed by GitHub
parent 8c4176bdb2
commit 0c081f19e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 185 additions and 49 deletions

View File

@ -1231,7 +1231,7 @@ jobs:
- name: Run Elasticsearch
run: |
docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms128m -Xmx256m" elasticsearch:7.9.2
docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms128m -Xmx256m" elasticsearch:7.17.6
- name: Run Opensearch
run: |

View File

@ -373,7 +373,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore):
)
try:
result = self.client.search(index=index, body=body, request_timeout=300, headers=headers)["hits"]["hits"]
result = self.client.search(index=index, **body, request_timeout=300, headers=headers)["hits"]["hits"]
if len(result) == 0:
count_documents = self.get_document_count(index=index, headers=headers)
if count_documents == 0:
@ -454,7 +454,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore):
}
try:
self.client.indices.create(index=index_name, body=mapping, headers=headers)
self.client.indices.create(index=index_name, **mapping, headers=headers)
except self._RequestError as e:
# With multiple workers we need to avoid race conditions, where:
# - there's no index in the beginning
@ -483,7 +483,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore):
}
}
try:
self.client.indices.create(index=index_name, body=mapping, headers=headers)
self.client.indices.create(index=index_name, **mapping, headers=headers)
except self._RequestError as e:
# With multiple workers we need to avoid race conditions, where:
# - there's no index in the beginning
@ -496,7 +496,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore):
"""
Validates an existing document index. If there's no embedding field, we'll add it.
"""
indices = self.client.indices.get(index_name, headers=headers)
indices = self.client.indices.get(index=index_name, headers=headers)
if not any(indices):
logger.warning(

View File

@ -1339,3 +1339,123 @@ class OpenSearchDocumentStore(SearchEngineDocumentStore):
progress_bar.update(batch_size)
finally:
opensearch_logger.setLevel(original_log_level)
def get_metadata_values_by_key(
self,
key: str,
query: Optional[str] = None,
filters: Optional[FilterType] = None,
index: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
) -> List[dict]:
"""
Get values associated with a metadata key. The output is in the format:
[{"value": "my-value-1", "count": 23}, {"value": "my-value-2", "count": 12}, ... ]
:param key: The meta key name to get the values for.
:param query: Narrow down the scope to documents matching the query string.
:param filters: Narrow down the scope to documents matching the given filters.
Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
Logical operator keys take a dictionary of metadata field names and/or logical operators as
value. Metadata field names take a dictionary of comparison operators as value. Comparison
operator keys take a single value or (in case of `"$in"`) a list of values as value.
If no logical operator is provided, `"$and"` is used as default operation. If no comparison
operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
operation.
__Example__:
```python
filters = {
"$and": {
"type": {"$eq": "article"},
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": {"$in": ["economy", "politics"]},
"publisher": {"$eq": "nytimes"}
}
}
}
```
:param index: The search index to search for the meta values. If not supplied,
self.index is used.
:param headers: Custom HTTP headers to pass to the client (for example, {'Authorization': 'Basic YWRtaW46cm9vdA=='})
Check out [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html) for more information.
"""
body: dict = {
"size": 0,
"aggs": {"metadata_agg": {"composite": {"sources": [{key: {"terms": {"field": key}}}]}}},
}
if query:
body["query"] = {
"bool": {
"should": [{"multi_match": {"query": query, "type": "most_fields", "fields": self.search_fields}}]
}
}
if filters:
if not body.get("query"):
body["query"] = {"bool": {}}
body["query"]["bool"].update({"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()})
result = self.client.search(body=body, index=index, headers=headers)
values = []
current_buckets = result["aggregations"]["metadata_agg"]["buckets"]
after_key = result["aggregations"]["metadata_agg"].get("after_key", False)
for bucket in current_buckets:
values.append({"value": bucket["key"][key], "count": bucket["doc_count"]})
# Only 10 results get returned at a time, so apply pagination
while after_key:
body["aggs"]["metadata_agg"]["composite"]["after"] = after_key
result = self.client.search(body=body, index=index, headers=headers)
current_buckets = result["aggregations"]["metadata_agg"]["buckets"]
after_key = result["aggregations"]["metadata_agg"].get("after_key", False)
for bucket in current_buckets:
values.append({"value": bucket["key"][key], "count": bucket["doc_count"]})
return values
def get_documents_by_id(
self,
ids: List[str],
index: Optional[str] = None,
batch_size: int = 10_000,
headers: Optional[Dict[str, str]] = None,
) -> List[Document]:
"""
Fetch documents by specifying a list of text ID strings.
:param ids: List of document IDs. Be aware that passing a large number of IDs might lead to performance issues.
:param index: The search index where the documents are stored. If not supplied,
self.index is used.
:param batch_size: Maximum number of results for each query.
Limited to 10,000 documents by default.
To reduce the pressure on the cluster, you can lower this limit at the expense
of longer retrieval times.
:param headers: Custom HTTP headers to pass to the client (for example, {'Authorization': 'Basic YWRtaW46cm9vdA=='})
Check out [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html) for more information.
"""
index = index or self.index
documents = []
for i in range(0, len(ids), batch_size):
ids_for_batch = ids[i : i + batch_size]
query = {"size": len(ids_for_batch), "query": {"ids": {"values": ids_for_batch}}}
if not self.return_embedding and self.embedding_field:
query["_source"] = {"excludes": [self.embedding_field]}
result = self.client.search(index=index, body=query, headers=headers)["hits"]["hits"]
documents.extend([self._convert_es_hit_to_document(hit) for hit in result])
return documents
def update_document_meta(
self, id: str, meta: Dict[str, str], index: Optional[str] = None, headers: Optional[Dict[str, str]] = None
):
"""
Update the metadata dictionary of a document by specifying its ID string.
"""
if not index:
index = self.index
body = {"doc": meta}
self.client.update(index=index, id=id, body=body, refresh=self.refresh_type, headers=headers)

View File

@ -166,7 +166,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
if self.client.indices.exists_alias(name=index_name):
logger.debug("Index name %s is an alias.", index_name)
return self.client.indices.exists(index_name, headers=headers)
return self.client.indices.exists(index=index_name, headers=headers)
@abstractmethod
def _validate_and_adjust_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
@ -281,7 +281,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
query = {"size": len(ids_for_batch), "query": {"ids": {"values": ids_for_batch}}}
if not self.return_embedding and self.embedding_field:
query["_source"] = {"excludes": [self.embedding_field]}
result = self.client.search(index=index, body=query, headers=headers)["hits"]["hits"]
result = self.client.search(index=index, **query, headers=headers)["hits"]["hits"]
documents.extend([self._convert_es_hit_to_document(hit) for hit in result])
return documents
@ -344,7 +344,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
if not body.get("query"):
body["query"] = {"bool": {}}
body["query"]["bool"].update({"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()})
result = self.client.search(body=body, index=index, headers=headers)
result = self.client.search(**body, index=index, headers=headers)
values = []
current_buckets = result["aggregations"]["metadata_agg"]["buckets"]
@ -355,7 +355,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
# Only 10 results get returned at a time, so apply pagination
while after_key:
body["aggs"]["metadata_agg"]["composite"]["after"] = after_key
result = self.client.search(body=body, index=index, headers=headers)
result = self.client.search(**body, index=index, headers=headers)
current_buckets = result["aggregations"]["metadata_agg"]["buckets"]
after_key = result["aggregations"]["metadata_agg"].get("after_key", False)
for bucket in current_buckets:
@ -521,7 +521,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
if not index:
index = self.index
body = {"doc": meta}
self.client.update(index=index, id=id, body=body, refresh=self.refresh_type, headers=headers)
self.client.update(index=index, id=id, **body, refresh=self.refresh_type, headers=headers)
def get_document_count(
self,
@ -908,7 +908,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
all_terms_must_match=all_terms_must_match,
)
result = self.client.search(index=index, body=body, headers=headers)["hits"]["hits"]
result = self.client.search(index=index, **body, headers=headers)["hits"]["hits"]
documents = [self._convert_es_hit_to_document(hit, scale_score=scale_score) for hit in result]
return documents
@ -1542,7 +1542,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore):
self.client.delete_by_query(index=index, body=query, ignore=[404], headers=headers)
# We want to be sure that all docs are deleted before continuing (delete_by_query doesn't support wait_for)
if self.refresh_type == "wait_for":
time.sleep(2)
self.client.indices.refresh(index=index)
def delete_labels(
self,

View File

@ -75,7 +75,7 @@ dependencies = [
"sentence-transformers>=2.2.0",
# Elasticsearch
"elasticsearch>=7.7,<8",
"elasticsearch>=7.17,<8",
# OpenAI tokenizer
"tiktoken>=0.3.0; python_version >= '3.8' and (platform_machine == 'AMD64' or platform_machine == 'amd64' or platform_machine == 'x86_64' or (platform_machine == 'arm64' and platform_system == 'Darwin'))",

View File

@ -22,7 +22,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine
@pytest.fixture
def ds(self):
"""
This fixture provides a working document store and takes care of removing the indices when done
This fixture provides a working document store and takes care of keeping clean
the ES cluster used in the tests.
"""
labels_index_name = f"{self.index_name}_labels"
ds = ElasticsearchDocumentStore(
@ -30,10 +31,10 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine
label_index=labels_index_name,
host=os.environ.get("ELASTICSEARCH_HOST", "localhost"),
create_index=True,
recreate_index=True,
)
yield ds
ds.delete_index(self.index_name)
ds.delete_index(labels_index_name)
@pytest.fixture
def mocked_elastic_search_init(self, monkeypatch):
@ -213,8 +214,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine
settings = {"mappings": {"properties": {"content": {"type": "text"}}}}
client.indices.create(index="haystack_existing_alias_1", body=settings)
client.indices.create(index="haystack_existing_alias_2", body=settings)
client.indices.create(index="haystack_existing_alias_1", **settings)
client.indices.create(index="haystack_existing_alias_2", **settings)
client.indices.put_alias(
index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias"
@ -233,8 +234,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine
right_settings = {"mappings": {"properties": {"content": {"type": "text"}}}}
wrong_settings = {"mappings": {"properties": {"content": {"type": "histogram"}}}}
client.indices.create(index="haystack_existing_alias_1", body=right_settings)
client.indices.create(index="haystack_existing_alias_2", body=wrong_settings)
client.indices.create(index="haystack_existing_alias_1", **right_settings)
client.indices.create(index="haystack_existing_alias_2", **wrong_settings)
client.indices.put_alias(
index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias"
)
@ -326,3 +327,20 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine
username="", aws4auth="foo", **_init_client_remaining_kwargs
)
assert len(caplog.records) == 0
@pytest.mark.unit
def test_get_document_by_id_return_embedding_false(self, mocked_document_store):
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["_source"] == {"excludes": ["embedding"]}
@pytest.mark.unit
def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store):
mocked_document_store.excluded_meta_data = ["foo"]
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is not affected by the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["_source"] == {"excludes": ["embedding"]}

View File

@ -32,7 +32,8 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc
@pytest.fixture
def ds(self):
"""
This fixture provides a working document store and takes care of removing the indices when done
This fixture provides a working document store and takes care of keeping clean the
OS cluster used in the tests.
"""
labels_index_name = f"{self.index_name}_labels"
ds = OpenSearchDocumentStore(
@ -40,10 +41,10 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc
label_index=labels_index_name,
host=os.environ.get("OPENSEARCH_HOST", "localhost"),
create_index=True,
recreate_index=True,
)
yield ds
ds.delete_index(self.index_name)
ds.delete_index(labels_index_name)
@pytest.fixture
def mocked_document_store(self, existing_index):
@ -1239,3 +1240,20 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc
]
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)
assert mocked_bulk.call_count == 5
@pytest.mark.unit
def test_get_document_by_id_return_embedding_false(self, mocked_document_store):
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["embedding"]}
@pytest.mark.unit
def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store):
mocked_document_store.excluded_meta_data = ["foo"]
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is not affected by the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["embedding"]}

View File

@ -62,7 +62,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.query(self.query)
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert "_source" not in kwargs["body"]
assert "_source" not in kwargs
@pytest.mark.unit
def test_query_return_embedding_false(self, mocked_document_store):
@ -70,7 +70,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.query(self.query)
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["embedding"]}
assert kwargs["_source"] == {"excludes": ["embedding"]}
@pytest.mark.unit
def test_query_excluded_meta_data_return_embedding_true(self, mocked_document_store):
@ -79,7 +79,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.query(self.query)
_, kwargs = mocked_document_store.client.search.call_args
# we expect "embedding" was removed from the final query
assert kwargs["body"]["_source"] == {"excludes": ["foo"]}
assert kwargs["_source"] == {"excludes": ["foo"]}
@pytest.mark.unit
def test_query_excluded_meta_data_return_embedding_false(self, mocked_document_store):
@ -88,7 +88,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.query(self.query)
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["foo", "embedding"]}
assert kwargs["_source"] == {"excludes": ["foo", "embedding"]}
@pytest.mark.unit
def test_get_all_documents_return_embedding_true(self, mocked_document_store):
@ -97,10 +97,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.get_all_documents(return_embedding=True)
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
# starting with elasticsearch client 7.16, scan() uses the query parameter instead of body,
# see https://github.com/elastic/elasticsearch-py/commit/889edc9ad6d728b79fadf790238b79f36449d2e2
body = kwargs.get("body", kwargs)
assert "_source" not in body
assert "_source" not in kwargs
@pytest.mark.unit
def test_get_all_documents_return_embedding_false(self, mocked_document_store):
@ -132,24 +129,7 @@ class SearchEngineDocumentStoreTestAbstract:
mocked_document_store.get_document_by_id("123")
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert "_source" not in kwargs["body"]
@pytest.mark.unit
def test_get_document_by_id_return_embedding_false(self, mocked_document_store):
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is consistent with the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["embedding"]}
@pytest.mark.unit
def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store):
mocked_document_store.excluded_meta_data = ["foo"]
mocked_document_store.return_embedding = False
mocked_document_store.get_document_by_id("123")
# assert the resulting body is not affected by the `excluded_meta_data` value
_, kwargs = mocked_document_store.client.search.call_args
assert kwargs["body"]["_source"] == {"excludes": ["embedding"]}
assert "_source" not in kwargs
@pytest.mark.unit
def test_get_all_labels_legacy_document_id(self, mocked_document_store, mocked_get_all_documents_in_index):