Add Brownfield Support of existing Elasticsearch indices (#2229)

* Add method to transform existing ES index

* Add possibility to regularly add new records

* Fix types

* Restructure import statement

* Add use_system_proxy param

* Update Documentation & Code Style

* Change location and name + add test

* Update Documentation & Code Style

* Add test cases for metadata fields

* Fix linter

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
bogdankostic 2022-02-22 20:58:57 +01:00 committed by GitHub
parent 965cc86b24
commit 4bad21e961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 8 deletions

View File

@ -24,4 +24,9 @@ GraphDBKnowledgeGraph = safe_import("haystack.document_stores.graphdb", "GraphDB
from haystack.document_stores.memory import InMemoryDocumentStore
from haystack.document_stores.deepsetcloud import DeepsetCloudDocumentStore
from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl
from haystack.document_stores.utils import (
eval_data_from_json,
eval_data_from_jsonl,
squad_json_to_jsonl,
es_index_to_document_store,
)

View File

@ -18,6 +18,7 @@ from haystack.errors import DuplicateDocumentError
from haystack.nodes.preprocessor import PreProcessor
from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl
logger = logging.getLogger(__name__)
try:

View File

@ -240,8 +240,9 @@ class ElasticsearchDocumentStore(KeywordDocumentStore):
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type
@classmethod
def _init_elastic_client(
self,
cls,
host: Union[str, List[str]],
port: Union[int, List[int]],
username: str,
@ -256,7 +257,7 @@ class ElasticsearchDocumentStore(KeywordDocumentStore):
use_system_proxy: bool,
) -> Elasticsearch:
hosts = self._prepare_hosts(host, port)
hosts = cls._prepare_hosts(host, port)
if (api_key or api_key_id) and not (api_key and api_key_id):
raise ValueError("You must provide either both or none of `api_key_id` and `api_key`")
@ -326,7 +327,8 @@ class ElasticsearchDocumentStore(KeywordDocumentStore):
)
return client
def _prepare_hosts(self, host, port):
@staticmethod
def _prepare_hosts(host, port):
# Create list of host(s) + port(s) to allow direct client connections to multiple elasticsearch nodes
if isinstance(host, list):
if isinstance(port, list):

View File

@ -6,7 +6,7 @@ from functools import reduce
from sqlalchemy.sql import select
from sqlalchemy import and_, or_
from haystack.document_stores.utils import convert_date_to_rfc3339
from haystack.document_stores import utils
def nested_defaultdict() -> defaultdict:
@ -261,7 +261,7 @@ class ComparisonOperation(ABC):
if isinstance(value, str):
# Check if comparison value is a date
try:
value = convert_date_to_rfc3339(value)
value = utils.convert_date_to_rfc3339(value)
data_type = "valueDate"
# Comparison value is a plain string
except ValueError:

View File

@ -1,12 +1,20 @@
import typing
from typing import Dict, List, Optional, Tuple, Union, Generator
import json
import logging
from datetime import datetime
from elasticsearch.helpers import scan
from tqdm.auto import tqdm
from haystack.document_stores.filter_utils import LogicalFilterClause
from haystack.schema import Document, Label, Answer, Span
from haystack.nodes.preprocessor import PreProcessor
if typing.TYPE_CHECKING:
# This results in a circular import if we don't use typing.TYPE_CHECKING
from haystack.document_stores.base import BaseDocumentStore
logger = logging.getLogger(__name__)
@ -271,3 +279,143 @@ def convert_date_to_rfc3339(date: str) -> str:
converted_date = parsed_datetime.isoformat()
return converted_date
def es_index_to_document_store(
document_store: "BaseDocumentStore",
original_index_name: str,
original_content_field: str,
original_name_field: Optional[str] = None,
included_metadata_fields: Optional[List[str]] = None,
excluded_metadata_fields: Optional[List[str]] = None,
store_original_ids: bool = True,
index: Optional[str] = None,
preprocessor: Optional[PreProcessor] = None,
batch_size: int = 10_000,
host: Union[str, List[str]] = "localhost",
port: Union[int, List[int]] = 9200,
username: str = "",
password: str = "",
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
aws4auth=None,
scheme: str = "http",
ca_certs: Optional[str] = None,
verify_certs: bool = True,
timeout: int = 30,
use_system_proxy: bool = False,
) -> "BaseDocumentStore":
"""
This function provides brownfield support of existing Elasticsearch indexes by converting each of the records in
the provided index to haystack `Document` objects and writing them to the specified `DocumentStore`. It can be used
on a regular basis in order to add new records of the Elasticsearch index to the `DocumentStore`.
:param document_store: The haystack `DocumentStore` to write the converted `Document` objects to.
:param original_index_name: Elasticsearch index containing the records to be converted.
:param original_content_field: Elasticsearch field containing the text to be put in the `content` field of the
resulting haystack `Document` objects.
:param original_name_field: Optional Elasticsearch field containing the title title of the Document.
:param included_metadata_fields: List of Elasticsearch fields that shall be stored in the `meta` field of the
resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`,
all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the
`included_metadata_fields` and `excluded_metadata_fields` parameters.
:param excluded_metadata_fields: List of Elasticsearch fields that shall be excluded from the `meta` field of the
resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`,
all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the
`included_metadata_fields` and `excluded_metadata_fields` parameters.
:param store_original_ids: Whether to store the ID a record had in the original Elasticsearch index at the
`"_original_es_id"` metadata field of the resulting haystack `Document` objects. This should be set to `True`
if you want to continuously update the `DocumentStore` with new records inside your Elasticsearch index. If this
parameter was set to `False` on the first call of `es_index_to_document_store`,
all the indexed Documents in the `DocumentStore` will be overwritten in the second call.
:param index: Name of index in `document_store` to use to store the resulting haystack `Document` objects.
:param preprocessor: Optional PreProcessor that will be applied on the content field of the original Elasticsearch
record.
:param batch_size: Number of records to process at once.
:param host: URL(s) of Elasticsearch nodes.
:param port: Ports(s) of Elasticsearch nodes.
:param username: Username (standard authentication via http_auth).
:param password: Password (standard authentication via http_auth).
:param api_key_id: ID of the API key (altenative authentication mode to the above http_auth).
:param api_key: Secret value of the API key (altenative authentication mode to the above http_auth).
:param aws4auth: Authentication for usage with AWS Elasticsearch
(can be generated with the requests-aws4auth package).
:param scheme: `"https"` or `"http"`, protocol used to connect to your Elasticsearch instance.
:param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk.
You can use certifi package with `certifi.where()` to find where the CA certs file is located in your machine.
:param verify_certs: Whether to be strict about ca certificates.
:param timeout: Number of seconds after which an Elasticsearch request times out.
:param use_system_proxy: Whether to use system proxy.
"""
# This import cannot be at the beginning of the file, as this would result in a circular import
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
# Initialize Elasticsearch client
es_client = ElasticsearchDocumentStore._init_elastic_client(
host=host,
port=port,
username=username,
password=password,
api_key=api_key,
api_key_id=api_key_id,
aws4auth=aws4auth,
scheme=scheme,
ca_certs=ca_certs,
verify_certs=verify_certs,
timeout=timeout,
use_system_proxy=use_system_proxy,
)
# Get existing original ES IDs inside DocumentStore in order to not reindex the corresponding records
existing_ids = [
doc.meta["_original_es_id"]
for doc in document_store.get_all_documents_generator(index=index)
if "_original_es_id" in doc.meta
]
# Iterate over each individual record
query: Dict[str, Dict] = {"query": {"bool": {"must": [{"match_all": {}}]}}}
if existing_ids:
filters = LogicalFilterClause.parse({"_id": {"$nin": existing_ids}}).convert_to_elasticsearch()
query["query"]["bool"]["filter"] = filters
records = scan(client=es_client, query=query, index=original_index_name)
number_of_records = es_client.count(index=original_index_name, body=query)["count"]
haystack_documents: List[Dict] = []
for idx, record in enumerate(tqdm(records, total=number_of_records, desc="Converting ES Records")):
# Write batch_size number of documents to haystack DocumentStore
if (idx + 1) % batch_size == 0:
document_store.write_documents(haystack_documents, index=index)
haystack_documents = []
# Get content and metadata of current record
content = record["_source"].pop(original_content_field, "")
if content:
record_doc = {"content": content, "meta": {}}
if original_name_field is not None:
if original_name_field in record["_source"]:
record_doc["meta"]["name"] = record["_source"].pop(original_name_field)
# Only add selected metadata fields
if included_metadata_fields is not None:
for metadata_field in included_metadata_fields:
if metadata_field in record["_source"]:
record_doc["meta"][metadata_field] = record["_source"][metadata_field]
# Add all metadata fields except for those in excluded_metadata_fields
else:
if excluded_metadata_fields is not None:
for metadata_field in excluded_metadata_fields:
record["_source"].pop(metadata_field, None)
record_doc["meta"].update(record["_source"])
if store_original_ids:
record_doc["meta"]["_original_es_id"] = record["_id"]
# Apply preprocessor if provided
preprocessed_docs = preprocessor.process(record_doc) if preprocessor is not None else [record_doc]
haystack_documents.extend(preprocessed_docs)
if haystack_documents:
document_store.write_documents(haystack_documents, index=index)
return document_store

View File

@ -17,13 +17,14 @@ from conftest import (
DC_TEST_INDEX,
SAMPLES_PATH,
)
from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore
from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore
from haystack.document_stores.base import BaseDocumentStore
from haystack.document_stores.utils import es_index_to_document_store
from haystack.errors import DuplicateDocumentError
from haystack.schema import Document, Label, Answer, Span
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.nodes import EmbeddingRetriever, PreProcessor
from haystack.pipelines import DocumentSearchPipeline
from haystack.utils import DeepsetCloudError
@ -1713,3 +1714,45 @@ def test_elasticsearch_search_field_mapping():
assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["content"]["type"] == "text"
assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["sub_content"]["type"] == "text"
@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True)
def test_elasticsearch_brownfield_support(document_store_with_docs):
new_document_store = InMemoryDocumentStore()
new_document_store = es_index_to_document_store(
document_store=new_document_store,
original_index_name="haystack_test",
original_content_field="content",
original_name_field="name",
included_metadata_fields=["date_field"],
index="test_brownfield_support",
)
original_documents = document_store_with_docs.get_all_documents(index="haystack_test")
transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support")
assert len(original_documents) == len(transferred_documents)
assert all("name" in doc.meta for doc in transferred_documents)
assert all("date_field" in doc.meta for doc in transferred_documents)
assert all("meta_field" not in doc.meta for doc in transferred_documents)
assert all("numeric_field" not in doc.meta for doc in transferred_documents)
original_content = set([doc.content for doc in original_documents])
transferred_content = set([doc.content for doc in transferred_documents])
assert original_content == transferred_content
# Test transferring docs with PreProcessor
new_document_store = es_index_to_document_store(
document_store=new_document_store,
original_index_name="haystack_test",
original_content_field="content",
excluded_metadata_fields=["date_field"],
index="test_brownfield_support_2",
preprocessor=PreProcessor(split_length=1, split_respect_sentence_boundary=False),
)
transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support_2")
assert all("date_field" not in doc.meta for doc in transferred_documents)
assert all("name" in doc.meta for doc in transferred_documents)
assert all("meta_field" in doc.meta for doc in transferred_documents)
assert all("numeric_field" in doc.meta for doc in transferred_documents)
# Check if number of transferred_documents is equal to number of unique words.
assert len(transferred_documents) == len(set(" ".join(original_content).split()))