diff --git a/haystack/document_stores/__init__.py b/haystack/document_stores/__init__.py index 9a580261b..fcb27c930 100644 --- a/haystack/document_stores/__init__.py +++ b/haystack/document_stores/__init__.py @@ -24,4 +24,9 @@ GraphDBKnowledgeGraph = safe_import("haystack.document_stores.graphdb", "GraphDB from haystack.document_stores.memory import InMemoryDocumentStore from haystack.document_stores.deepsetcloud import DeepsetCloudDocumentStore -from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl +from haystack.document_stores.utils import ( + eval_data_from_json, + eval_data_from_jsonl, + squad_json_to_jsonl, + es_index_to_document_store, +) diff --git a/haystack/document_stores/base.py b/haystack/document_stores/base.py index b10620190..e8e7d2f97 100644 --- a/haystack/document_stores/base.py +++ b/haystack/document_stores/base.py @@ -18,6 +18,7 @@ from haystack.errors import DuplicateDocumentError from haystack.nodes.preprocessor import PreProcessor from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl + logger = logging.getLogger(__name__) try: diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index d82e2a3e0..f33172141 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -240,8 +240,9 @@ class ElasticsearchDocumentStore(KeywordDocumentStore): self.duplicate_documents = duplicate_documents self.refresh_type = refresh_type + @classmethod def _init_elastic_client( - self, + cls, host: Union[str, List[str]], port: Union[int, List[int]], username: str, @@ -256,7 +257,7 @@ class ElasticsearchDocumentStore(KeywordDocumentStore): use_system_proxy: bool, ) -> Elasticsearch: - hosts = self._prepare_hosts(host, port) + hosts = cls._prepare_hosts(host, port) if (api_key or api_key_id) and not (api_key and api_key_id): raise ValueError("You must provide either both or none of `api_key_id` and `api_key`") @@ -326,7 +327,8 @@ class ElasticsearchDocumentStore(KeywordDocumentStore): ) return client - def _prepare_hosts(self, host, port): + @staticmethod + def _prepare_hosts(host, port): # Create list of host(s) + port(s) to allow direct client connections to multiple elasticsearch nodes if isinstance(host, list): if isinstance(port, list): diff --git a/haystack/document_stores/filter_utils.py b/haystack/document_stores/filter_utils.py index 8be46eb2c..d1d69196a 100644 --- a/haystack/document_stores/filter_utils.py +++ b/haystack/document_stores/filter_utils.py @@ -6,7 +6,7 @@ from functools import reduce from sqlalchemy.sql import select from sqlalchemy import and_, or_ -from haystack.document_stores.utils import convert_date_to_rfc3339 +from haystack.document_stores import utils def nested_defaultdict() -> defaultdict: @@ -261,7 +261,7 @@ class ComparisonOperation(ABC): if isinstance(value, str): # Check if comparison value is a date try: - value = convert_date_to_rfc3339(value) + value = utils.convert_date_to_rfc3339(value) data_type = "valueDate" # Comparison value is a plain string except ValueError: diff --git a/haystack/document_stores/utils.py b/haystack/document_stores/utils.py index 5743a1502..8e15593ad 100644 --- a/haystack/document_stores/utils.py +++ b/haystack/document_stores/utils.py @@ -1,12 +1,20 @@ +import typing from typing import Dict, List, Optional, Tuple, Union, Generator import json import logging from datetime import datetime +from elasticsearch.helpers import scan +from tqdm.auto import tqdm +from haystack.document_stores.filter_utils import LogicalFilterClause from haystack.schema import Document, Label, Answer, Span from haystack.nodes.preprocessor import PreProcessor +if typing.TYPE_CHECKING: + # This results in a circular import if we don't use typing.TYPE_CHECKING + from haystack.document_stores.base import BaseDocumentStore + logger = logging.getLogger(__name__) @@ -271,3 +279,143 @@ def convert_date_to_rfc3339(date: str) -> str: converted_date = parsed_datetime.isoformat() return converted_date + + +def es_index_to_document_store( + document_store: "BaseDocumentStore", + original_index_name: str, + original_content_field: str, + original_name_field: Optional[str] = None, + included_metadata_fields: Optional[List[str]] = None, + excluded_metadata_fields: Optional[List[str]] = None, + store_original_ids: bool = True, + index: Optional[str] = None, + preprocessor: Optional[PreProcessor] = None, + batch_size: int = 10_000, + host: Union[str, List[str]] = "localhost", + port: Union[int, List[int]] = 9200, + username: str = "", + password: str = "", + api_key_id: Optional[str] = None, + api_key: Optional[str] = None, + aws4auth=None, + scheme: str = "http", + ca_certs: Optional[str] = None, + verify_certs: bool = True, + timeout: int = 30, + use_system_proxy: bool = False, +) -> "BaseDocumentStore": + """ + This function provides brownfield support of existing Elasticsearch indexes by converting each of the records in + the provided index to haystack `Document` objects and writing them to the specified `DocumentStore`. It can be used + on a regular basis in order to add new records of the Elasticsearch index to the `DocumentStore`. + + :param document_store: The haystack `DocumentStore` to write the converted `Document` objects to. + :param original_index_name: Elasticsearch index containing the records to be converted. + :param original_content_field: Elasticsearch field containing the text to be put in the `content` field of the + resulting haystack `Document` objects. + :param original_name_field: Optional Elasticsearch field containing the title title of the Document. + :param included_metadata_fields: List of Elasticsearch fields that shall be stored in the `meta` field of the + resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`, + all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the + `included_metadata_fields` and `excluded_metadata_fields` parameters. + :param excluded_metadata_fields: List of Elasticsearch fields that shall be excluded from the `meta` field of the + resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`, + all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the + `included_metadata_fields` and `excluded_metadata_fields` parameters. + :param store_original_ids: Whether to store the ID a record had in the original Elasticsearch index at the + `"_original_es_id"` metadata field of the resulting haystack `Document` objects. This should be set to `True` + if you want to continuously update the `DocumentStore` with new records inside your Elasticsearch index. If this + parameter was set to `False` on the first call of `es_index_to_document_store`, + all the indexed Documents in the `DocumentStore` will be overwritten in the second call. + :param index: Name of index in `document_store` to use to store the resulting haystack `Document` objects. + :param preprocessor: Optional PreProcessor that will be applied on the content field of the original Elasticsearch + record. + :param batch_size: Number of records to process at once. + :param host: URL(s) of Elasticsearch nodes. + :param port: Ports(s) of Elasticsearch nodes. + :param username: Username (standard authentication via http_auth). + :param password: Password (standard authentication via http_auth). + :param api_key_id: ID of the API key (altenative authentication mode to the above http_auth). + :param api_key: Secret value of the API key (altenative authentication mode to the above http_auth). + :param aws4auth: Authentication for usage with AWS Elasticsearch + (can be generated with the requests-aws4auth package). + :param scheme: `"https"` or `"http"`, protocol used to connect to your Elasticsearch instance. + :param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk. + You can use certifi package with `certifi.where()` to find where the CA certs file is located in your machine. + :param verify_certs: Whether to be strict about ca certificates. + :param timeout: Number of seconds after which an Elasticsearch request times out. + :param use_system_proxy: Whether to use system proxy. + """ + # This import cannot be at the beginning of the file, as this would result in a circular import + from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore + + # Initialize Elasticsearch client + es_client = ElasticsearchDocumentStore._init_elastic_client( + host=host, + port=port, + username=username, + password=password, + api_key=api_key, + api_key_id=api_key_id, + aws4auth=aws4auth, + scheme=scheme, + ca_certs=ca_certs, + verify_certs=verify_certs, + timeout=timeout, + use_system_proxy=use_system_proxy, + ) + + # Get existing original ES IDs inside DocumentStore in order to not reindex the corresponding records + existing_ids = [ + doc.meta["_original_es_id"] + for doc in document_store.get_all_documents_generator(index=index) + if "_original_es_id" in doc.meta + ] + + # Iterate over each individual record + query: Dict[str, Dict] = {"query": {"bool": {"must": [{"match_all": {}}]}}} + if existing_ids: + filters = LogicalFilterClause.parse({"_id": {"$nin": existing_ids}}).convert_to_elasticsearch() + query["query"]["bool"]["filter"] = filters + records = scan(client=es_client, query=query, index=original_index_name) + number_of_records = es_client.count(index=original_index_name, body=query)["count"] + haystack_documents: List[Dict] = [] + for idx, record in enumerate(tqdm(records, total=number_of_records, desc="Converting ES Records")): + # Write batch_size number of documents to haystack DocumentStore + if (idx + 1) % batch_size == 0: + document_store.write_documents(haystack_documents, index=index) + haystack_documents = [] + + # Get content and metadata of current record + content = record["_source"].pop(original_content_field, "") + if content: + record_doc = {"content": content, "meta": {}} + + if original_name_field is not None: + if original_name_field in record["_source"]: + record_doc["meta"]["name"] = record["_source"].pop(original_name_field) + # Only add selected metadata fields + if included_metadata_fields is not None: + for metadata_field in included_metadata_fields: + if metadata_field in record["_source"]: + record_doc["meta"][metadata_field] = record["_source"][metadata_field] + # Add all metadata fields except for those in excluded_metadata_fields + else: + if excluded_metadata_fields is not None: + for metadata_field in excluded_metadata_fields: + record["_source"].pop(metadata_field, None) + record_doc["meta"].update(record["_source"]) + + if store_original_ids: + record_doc["meta"]["_original_es_id"] = record["_id"] + + # Apply preprocessor if provided + preprocessed_docs = preprocessor.process(record_doc) if preprocessor is not None else [record_doc] + + haystack_documents.extend(preprocessed_docs) + + if haystack_documents: + document_store.write_documents(haystack_documents, index=index) + + return document_store diff --git a/test/test_document_store.py b/test/test_document_store.py index 57dbfada9..04b51ba66 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -17,13 +17,14 @@ from conftest import ( DC_TEST_INDEX, SAMPLES_PATH, ) -from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore +from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore from haystack.document_stores.base import BaseDocumentStore +from haystack.document_stores.utils import es_index_to_document_store from haystack.errors import DuplicateDocumentError from haystack.schema import Document, Label, Answer, Span from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore from haystack.document_stores.faiss import FAISSDocumentStore -from haystack.nodes import EmbeddingRetriever +from haystack.nodes import EmbeddingRetriever, PreProcessor from haystack.pipelines import DocumentSearchPipeline from haystack.utils import DeepsetCloudError @@ -1713,3 +1714,45 @@ def test_elasticsearch_search_field_mapping(): assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["content"]["type"] == "text" assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["sub_content"]["type"] == "text" + + +@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) +def test_elasticsearch_brownfield_support(document_store_with_docs): + new_document_store = InMemoryDocumentStore() + new_document_store = es_index_to_document_store( + document_store=new_document_store, + original_index_name="haystack_test", + original_content_field="content", + original_name_field="name", + included_metadata_fields=["date_field"], + index="test_brownfield_support", + ) + + original_documents = document_store_with_docs.get_all_documents(index="haystack_test") + transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support") + assert len(original_documents) == len(transferred_documents) + assert all("name" in doc.meta for doc in transferred_documents) + assert all("date_field" in doc.meta for doc in transferred_documents) + assert all("meta_field" not in doc.meta for doc in transferred_documents) + assert all("numeric_field" not in doc.meta for doc in transferred_documents) + + original_content = set([doc.content for doc in original_documents]) + transferred_content = set([doc.content for doc in transferred_documents]) + assert original_content == transferred_content + + # Test transferring docs with PreProcessor + new_document_store = es_index_to_document_store( + document_store=new_document_store, + original_index_name="haystack_test", + original_content_field="content", + excluded_metadata_fields=["date_field"], + index="test_brownfield_support_2", + preprocessor=PreProcessor(split_length=1, split_respect_sentence_boundary=False), + ) + transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support_2") + assert all("date_field" not in doc.meta for doc in transferred_documents) + assert all("name" in doc.meta for doc in transferred_documents) + assert all("meta_field" in doc.meta for doc in transferred_documents) + assert all("numeric_field" in doc.meta for doc in transferred_documents) + # Check if number of transferred_documents is equal to number of unique words. + assert len(transferred_documents) == len(set(" ".join(original_content).split()))