diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d87fb7a22..a0ecf8d95 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1231,7 +1231,7 @@ jobs: - name: Run Elasticsearch run: | - docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms128m -Xmx256m" elasticsearch:7.9.2 + docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms128m -Xmx256m" elasticsearch:7.17.6 - name: Run Opensearch run: | diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index c28303f43..3d453d41a 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -373,7 +373,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore): ) try: - result = self.client.search(index=index, body=body, request_timeout=300, headers=headers)["hits"]["hits"] + result = self.client.search(index=index, **body, request_timeout=300, headers=headers)["hits"]["hits"] if len(result) == 0: count_documents = self.get_document_count(index=index, headers=headers) if count_documents == 0: @@ -454,7 +454,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore): } try: - self.client.indices.create(index=index_name, body=mapping, headers=headers) + self.client.indices.create(index=index_name, **mapping, headers=headers) except self._RequestError as e: # With multiple workers we need to avoid race conditions, where: # - there's no index in the beginning @@ -483,7 +483,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore): } } try: - self.client.indices.create(index=index_name, body=mapping, headers=headers) + self.client.indices.create(index=index_name, **mapping, headers=headers) except self._RequestError as e: # With multiple workers we need to avoid race conditions, where: # - there's no index in the beginning @@ -496,7 +496,7 @@ class ElasticsearchDocumentStore(SearchEngineDocumentStore): """ Validates an existing document index. If there's no embedding field, we'll add it. """ - indices = self.client.indices.get(index_name, headers=headers) + indices = self.client.indices.get(index=index_name, headers=headers) if not any(indices): logger.warning( diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index 6aba309ad..38e2fb9de 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -1339,3 +1339,123 @@ class OpenSearchDocumentStore(SearchEngineDocumentStore): progress_bar.update(batch_size) finally: opensearch_logger.setLevel(original_log_level) + + def get_metadata_values_by_key( + self, + key: str, + query: Optional[str] = None, + filters: Optional[FilterType] = None, + index: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + ) -> List[dict]: + """ + Get values associated with a metadata key. The output is in the format: + [{"value": "my-value-1", "count": 23}, {"value": "my-value-2", "count": 12}, ... ] + + :param key: The meta key name to get the values for. + :param query: Narrow down the scope to documents matching the query string. + :param filters: Narrow down the scope to documents matching the given filters. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + + __Example__: + + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param index: The search index to search for the meta values. If not supplied, + self.index is used. + :param headers: Custom HTTP headers to pass to the client (for example, {'Authorization': 'Basic YWRtaW46cm9vdA=='}) + Check out [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html) for more information. + """ + body: dict = { + "size": 0, + "aggs": {"metadata_agg": {"composite": {"sources": [{key: {"terms": {"field": key}}}]}}}, + } + if query: + body["query"] = { + "bool": { + "should": [{"multi_match": {"query": query, "type": "most_fields", "fields": self.search_fields}}] + } + } + if filters: + if not body.get("query"): + body["query"] = {"bool": {}} + body["query"]["bool"].update({"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}) + result = self.client.search(body=body, index=index, headers=headers) + + values = [] + current_buckets = result["aggregations"]["metadata_agg"]["buckets"] + after_key = result["aggregations"]["metadata_agg"].get("after_key", False) + for bucket in current_buckets: + values.append({"value": bucket["key"][key], "count": bucket["doc_count"]}) + + # Only 10 results get returned at a time, so apply pagination + while after_key: + body["aggs"]["metadata_agg"]["composite"]["after"] = after_key + result = self.client.search(body=body, index=index, headers=headers) + current_buckets = result["aggregations"]["metadata_agg"]["buckets"] + after_key = result["aggregations"]["metadata_agg"].get("after_key", False) + for bucket in current_buckets: + values.append({"value": bucket["key"][key], "count": bucket["doc_count"]}) + + return values + + def get_documents_by_id( + self, + ids: List[str], + index: Optional[str] = None, + batch_size: int = 10_000, + headers: Optional[Dict[str, str]] = None, + ) -> List[Document]: + """ + Fetch documents by specifying a list of text ID strings. + + :param ids: List of document IDs. Be aware that passing a large number of IDs might lead to performance issues. + :param index: The search index where the documents are stored. If not supplied, + self.index is used. + :param batch_size: Maximum number of results for each query. + Limited to 10,000 documents by default. + To reduce the pressure on the cluster, you can lower this limit at the expense + of longer retrieval times. + :param headers: Custom HTTP headers to pass to the client (for example, {'Authorization': 'Basic YWRtaW46cm9vdA=='}) + Check out [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html) for more information. + """ + index = index or self.index + documents = [] + for i in range(0, len(ids), batch_size): + ids_for_batch = ids[i : i + batch_size] + query = {"size": len(ids_for_batch), "query": {"ids": {"values": ids_for_batch}}} + if not self.return_embedding and self.embedding_field: + query["_source"] = {"excludes": [self.embedding_field]} + result = self.client.search(index=index, body=query, headers=headers)["hits"]["hits"] + documents.extend([self._convert_es_hit_to_document(hit) for hit in result]) + return documents + + def update_document_meta( + self, id: str, meta: Dict[str, str], index: Optional[str] = None, headers: Optional[Dict[str, str]] = None + ): + """ + Update the metadata dictionary of a document by specifying its ID string. + """ + if not index: + index = self.index + body = {"doc": meta} + self.client.update(index=index, id=id, body=body, refresh=self.refresh_type, headers=headers) diff --git a/haystack/document_stores/search_engine.py b/haystack/document_stores/search_engine.py index 206d87c13..d4acf0507 100644 --- a/haystack/document_stores/search_engine.py +++ b/haystack/document_stores/search_engine.py @@ -166,7 +166,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): if self.client.indices.exists_alias(name=index_name): logger.debug("Index name %s is an alias.", index_name) - return self.client.indices.exists(index_name, headers=headers) + return self.client.indices.exists(index=index_name, headers=headers) @abstractmethod def _validate_and_adjust_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None): @@ -281,7 +281,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): query = {"size": len(ids_for_batch), "query": {"ids": {"values": ids_for_batch}}} if not self.return_embedding and self.embedding_field: query["_source"] = {"excludes": [self.embedding_field]} - result = self.client.search(index=index, body=query, headers=headers)["hits"]["hits"] + result = self.client.search(index=index, **query, headers=headers)["hits"]["hits"] documents.extend([self._convert_es_hit_to_document(hit) for hit in result]) return documents @@ -344,7 +344,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): if not body.get("query"): body["query"] = {"bool": {}} body["query"]["bool"].update({"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}) - result = self.client.search(body=body, index=index, headers=headers) + result = self.client.search(**body, index=index, headers=headers) values = [] current_buckets = result["aggregations"]["metadata_agg"]["buckets"] @@ -355,7 +355,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): # Only 10 results get returned at a time, so apply pagination while after_key: body["aggs"]["metadata_agg"]["composite"]["after"] = after_key - result = self.client.search(body=body, index=index, headers=headers) + result = self.client.search(**body, index=index, headers=headers) current_buckets = result["aggregations"]["metadata_agg"]["buckets"] after_key = result["aggregations"]["metadata_agg"].get("after_key", False) for bucket in current_buckets: @@ -521,7 +521,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): if not index: index = self.index body = {"doc": meta} - self.client.update(index=index, id=id, body=body, refresh=self.refresh_type, headers=headers) + self.client.update(index=index, id=id, **body, refresh=self.refresh_type, headers=headers) def get_document_count( self, @@ -908,7 +908,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): all_terms_must_match=all_terms_must_match, ) - result = self.client.search(index=index, body=body, headers=headers)["hits"]["hits"] + result = self.client.search(index=index, **body, headers=headers)["hits"]["hits"] documents = [self._convert_es_hit_to_document(hit, scale_score=scale_score) for hit in result] return documents @@ -1542,7 +1542,7 @@ class SearchEngineDocumentStore(KeywordDocumentStore): self.client.delete_by_query(index=index, body=query, ignore=[404], headers=headers) # We want to be sure that all docs are deleted before continuing (delete_by_query doesn't support wait_for) if self.refresh_type == "wait_for": - time.sleep(2) + self.client.indices.refresh(index=index) def delete_labels( self, diff --git a/pyproject.toml b/pyproject.toml index c38a17471..a7fef6b38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,7 +75,7 @@ dependencies = [ "sentence-transformers>=2.2.0", # Elasticsearch - "elasticsearch>=7.7,<8", + "elasticsearch>=7.17,<8", # OpenAI tokenizer "tiktoken>=0.3.0; python_version >= '3.8' and (platform_machine == 'AMD64' or platform_machine == 'amd64' or platform_machine == 'x86_64' or (platform_machine == 'arm64' and platform_system == 'Darwin'))", diff --git a/test/document_stores/test_elasticsearch.py b/test/document_stores/test_elasticsearch.py index 09cc89c29..2398d4758 100644 --- a/test/document_stores/test_elasticsearch.py +++ b/test/document_stores/test_elasticsearch.py @@ -22,7 +22,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine @pytest.fixture def ds(self): """ - This fixture provides a working document store and takes care of removing the indices when done + This fixture provides a working document store and takes care of keeping clean + the ES cluster used in the tests. """ labels_index_name = f"{self.index_name}_labels" ds = ElasticsearchDocumentStore( @@ -30,10 +31,10 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine label_index=labels_index_name, host=os.environ.get("ELASTICSEARCH_HOST", "localhost"), create_index=True, + recreate_index=True, ) + yield ds - ds.delete_index(self.index_name) - ds.delete_index(labels_index_name) @pytest.fixture def mocked_elastic_search_init(self, monkeypatch): @@ -213,8 +214,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine settings = {"mappings": {"properties": {"content": {"type": "text"}}}} - client.indices.create(index="haystack_existing_alias_1", body=settings) - client.indices.create(index="haystack_existing_alias_2", body=settings) + client.indices.create(index="haystack_existing_alias_1", **settings) + client.indices.create(index="haystack_existing_alias_2", **settings) client.indices.put_alias( index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias" @@ -233,8 +234,8 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine right_settings = {"mappings": {"properties": {"content": {"type": "text"}}}} wrong_settings = {"mappings": {"properties": {"content": {"type": "histogram"}}}} - client.indices.create(index="haystack_existing_alias_1", body=right_settings) - client.indices.create(index="haystack_existing_alias_2", body=wrong_settings) + client.indices.create(index="haystack_existing_alias_1", **right_settings) + client.indices.create(index="haystack_existing_alias_2", **wrong_settings) client.indices.put_alias( index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias" ) @@ -326,3 +327,20 @@ class TestElasticsearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngine username="", aws4auth="foo", **_init_client_remaining_kwargs ) assert len(caplog.records) == 0 + + @pytest.mark.unit + def test_get_document_by_id_return_embedding_false(self, mocked_document_store): + mocked_document_store.return_embedding = False + mocked_document_store.get_document_by_id("123") + # assert the resulting body is consistent with the `excluded_meta_data` value + _, kwargs = mocked_document_store.client.search.call_args + assert kwargs["_source"] == {"excludes": ["embedding"]} + + @pytest.mark.unit + def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store): + mocked_document_store.excluded_meta_data = ["foo"] + mocked_document_store.return_embedding = False + mocked_document_store.get_document_by_id("123") + # assert the resulting body is not affected by the `excluded_meta_data` value + _, kwargs = mocked_document_store.client.search.call_args + assert kwargs["_source"] == {"excludes": ["embedding"]} diff --git a/test/document_stores/test_opensearch.py b/test/document_stores/test_opensearch.py index a06332134..df070bc84 100644 --- a/test/document_stores/test_opensearch.py +++ b/test/document_stores/test_opensearch.py @@ -32,7 +32,8 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc @pytest.fixture def ds(self): """ - This fixture provides a working document store and takes care of removing the indices when done + This fixture provides a working document store and takes care of keeping clean the + OS cluster used in the tests. """ labels_index_name = f"{self.index_name}_labels" ds = OpenSearchDocumentStore( @@ -40,10 +41,10 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc label_index=labels_index_name, host=os.environ.get("OPENSEARCH_HOST", "localhost"), create_index=True, + recreate_index=True, ) + yield ds - ds.delete_index(self.index_name) - ds.delete_index(labels_index_name) @pytest.fixture def mocked_document_store(self, existing_index): @@ -1239,3 +1240,20 @@ class TestOpenSearchDocumentStore(DocumentStoreBaseTestAbstract, SearchEngineDoc ] mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3) assert mocked_bulk.call_count == 5 + + @pytest.mark.unit + def test_get_document_by_id_return_embedding_false(self, mocked_document_store): + mocked_document_store.return_embedding = False + mocked_document_store.get_document_by_id("123") + # assert the resulting body is consistent with the `excluded_meta_data` value + _, kwargs = mocked_document_store.client.search.call_args + assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} + + @pytest.mark.unit + def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store): + mocked_document_store.excluded_meta_data = ["foo"] + mocked_document_store.return_embedding = False + mocked_document_store.get_document_by_id("123") + # assert the resulting body is not affected by the `excluded_meta_data` value + _, kwargs = mocked_document_store.client.search.call_args + assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} diff --git a/test/document_stores/test_search_engine.py b/test/document_stores/test_search_engine.py index db8d9b7c9..e83adad24 100644 --- a/test/document_stores/test_search_engine.py +++ b/test/document_stores/test_search_engine.py @@ -62,7 +62,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.query(self.query) # assert the resulting body is consistent with the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args - assert "_source" not in kwargs["body"] + assert "_source" not in kwargs @pytest.mark.unit def test_query_return_embedding_false(self, mocked_document_store): @@ -70,7 +70,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.query(self.query) # assert the resulting body is consistent with the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args - assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} + assert kwargs["_source"] == {"excludes": ["embedding"]} @pytest.mark.unit def test_query_excluded_meta_data_return_embedding_true(self, mocked_document_store): @@ -79,7 +79,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.query(self.query) _, kwargs = mocked_document_store.client.search.call_args # we expect "embedding" was removed from the final query - assert kwargs["body"]["_source"] == {"excludes": ["foo"]} + assert kwargs["_source"] == {"excludes": ["foo"]} @pytest.mark.unit def test_query_excluded_meta_data_return_embedding_false(self, mocked_document_store): @@ -88,7 +88,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.query(self.query) # assert the resulting body is consistent with the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args - assert kwargs["body"]["_source"] == {"excludes": ["foo", "embedding"]} + assert kwargs["_source"] == {"excludes": ["foo", "embedding"]} @pytest.mark.unit def test_get_all_documents_return_embedding_true(self, mocked_document_store): @@ -97,10 +97,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.get_all_documents(return_embedding=True) # assert the resulting body is consistent with the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args - # starting with elasticsearch client 7.16, scan() uses the query parameter instead of body, - # see https://github.com/elastic/elasticsearch-py/commit/889edc9ad6d728b79fadf790238b79f36449d2e2 - body = kwargs.get("body", kwargs) - assert "_source" not in body + assert "_source" not in kwargs @pytest.mark.unit def test_get_all_documents_return_embedding_false(self, mocked_document_store): @@ -132,24 +129,7 @@ class SearchEngineDocumentStoreTestAbstract: mocked_document_store.get_document_by_id("123") # assert the resulting body is consistent with the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args - assert "_source" not in kwargs["body"] - - @pytest.mark.unit - def test_get_document_by_id_return_embedding_false(self, mocked_document_store): - mocked_document_store.return_embedding = False - mocked_document_store.get_document_by_id("123") - # assert the resulting body is consistent with the `excluded_meta_data` value - _, kwargs = mocked_document_store.client.search.call_args - assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} - - @pytest.mark.unit - def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_document_store): - mocked_document_store.excluded_meta_data = ["foo"] - mocked_document_store.return_embedding = False - mocked_document_store.get_document_by_id("123") - # assert the resulting body is not affected by the `excluded_meta_data` value - _, kwargs = mocked_document_store.client.search.call_args - assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} + assert "_source" not in kwargs @pytest.mark.unit def test_get_all_labels_legacy_document_id(self, mocked_document_store, mocked_get_all_documents_in_index):