feat: add elasticsearch destination connector (#2152)

Closes https://github.com/Unstructured-IO/unstructured/issues/1842
Closes https://github.com/Unstructured-IO/unstructured/issues/2202
Closes https://github.com/Unstructured-IO/unstructured/issues/2203

This PR:
- Adds Elasticsearch destination connector to be able to ingest
documents from any supported source, embed them and write the embeddings
/ documents into Elasticsearch.
- Defines an example unstructured elements schema for users to be able
to setup their unstructured elasticsearch indexes easily.
- Includes parallelized upload and lazy processing for elasticsearch
destination connector.
- Rearranges elasticsearch test helpers to source, destination, and
common folders.
- Adds util functions to be able to batch iterables in a lazy way for
uploads
- Fixes a bug where removing the optional parameter `--fields` broke the
connector due to an integer processing error.
- Fixes a bug where using an [elasticsearch
config](8fa5cbf036/unstructured/ingest/connector/elasticsearch.py (L26-L35))
for a destination connector resulted in a serialization issue when
optional parameter `--fields` was not provided.
This commit is contained in:
Ahmet Melek 2023-12-20 01:26:58 +00:00 committed by GitHub
parent 4e2ba2c9b2
commit fd293b3e78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 696 additions and 59 deletions

View File

@ -1,14 +1,15 @@
## 0.11.6-dev3
## 0.11.6-dev4
### Enhancements
* **Update the layout analysis script.** The previous script only supported annotating `final` elements. The updated script also supports annotating `inferred` and `extracted` elements.
### Features
* **Add Chroma destination connector** Chroma database connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned/embedded data to a Chroma vector database.
* **Add Elasticsearch destination connector.** Problem: After ingesting data from a source, users might want to move their data into a destination. Elasticsearch is a popular storage solution for various functionality such as search, or providing intermediary caches within data pipelines. Feature: Added Elasticsearch destination connector to be able to ingest documents from any supported source, embed them and write the embeddings / documents into Elasticsearch.
### Fixes
* **Enable --fields argument omission for elasticsearch connector** Solves two bugs where removing the optional parameter --fields broke the connector due to an integer processing error and using an elasticsearch config for a destination connector resulted in a serialization issue when optional parameter --fields was not provided.
## 0.11.5

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"}
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-output-to-elasticsearch \
--strategy fast \
--chunk-elements \
--embedding-provider "$EMBEDDING_PROVIDER" \
--num-processes 4 \
--verbose \
elasticsearch \
--hosts "$ELASTICSEARCH_HOSTS" \
--username "$ELASTICSEARCH_USERNAME" \
--password "$ELASTICSEARCH_PASSWORD" \
--index-name "$ELASTICSEARCH_INDEX_NAME" \
--num-processes 2

View File

@ -0,0 +1,60 @@
import os
from unstructured.ingest.connector.elasticsearch import (
ElasticsearchAccessConfig,
ElasticsearchWriteConfig,
SimpleElasticsearchConfig,
)
from unstructured.ingest.connector.local import SimpleLocalConfig
from unstructured.ingest.interfaces import (
ChunkingConfig,
EmbeddingConfig,
PartitionConfig,
ProcessorConfig,
ReadConfig,
)
from unstructured.ingest.runner import LocalRunner
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.elasticsearch import (
ElasticsearchWriter,
)
def get_writer() -> Writer:
return ElasticsearchWriter(
connector_config=SimpleElasticsearchConfig(
access_config=ElasticsearchAccessConfig(
hosts=os.getenv("ELASTICSEARCH_HOSTS"),
username=os.getenv("ELASTICSEARCH_USERNAME"),
password=os.getenv("ELASTICSEARCH_PASSWORD"),
),
index_name=os.getenv("ELASTICSEARCH_INDEX_NAME"),
),
write_config=ElasticsearchWriteConfig(
batch_size_bytes=15_000_000,
num_processes=2,
),
)
if __name__ == "__main__":
writer = get_writer()
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-elasticsearch",
num_processes=2,
),
connector_config=SimpleLocalConfig(
input_path="example-docs/book-war-and-peace-1225p.txt",
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(chunk_elements=True),
embedding_config=EmbeddingConfig(
provider="langchain-huggingface",
),
writer=writer,
writer_kwargs={},
)
runner.run()

View File

@ -0,0 +1,144 @@
{
"properties": {
"element_id": {
"type": "keyword"
},
"text": {
"type": "text",
"analyzer": "english"
},
"type": {
"type": "keyword"
},
"embeddings": {
"type": "float"
},
"metadata": {
"type": "object",
"properties": {
"category_depth": {
"type": "integer"
},
"parent_id": {
"type": "keyword"
},
"attached_to_filename": {
"type": "keyword"
},
"filetype": {
"type": "keyword"
},
"last_modified": {
"type": "date"
},
"file_directory": {
"type": "keyword"
},
"filename": {
"type": "keyword"
},
"data_source": {
"type": "object",
"properties": {
"url": {
"type": "text",
"analyzer": "standard"
},
"version": {
"type": "keyword"
},
"date_created": {
"type": "date"
},
"date_modified": {
"type": "date"
},
"date_processed": {
"type": "date"
},
"record_locator": {
"type": "keyword"
},
"permissions_data": {
"type": "object"
}
}
},
"coordinates": {
"type": "object",
"properties": {
"system": {
"type": "keyword"
},
"layout_width": {
"type": "float"
},
"layout_height": {
"type": "float"
},
"points": {
"type": "float"
}
}
},
"languages": {
"type": "keyword"
},
"page_number": {
"type": "integer"
},
"page_name": {
"type": "keyword"
},
"url": {
"type": "text",
"analyzer": "standard"
},
"links": {
"type": "object"
},
"link_urls": {
"type": "text"
},
"link_texts": {
"type": "text"
},
"sent_from": {
"type": "text",
"analyzer": "standard"
},
"sent_to": {
"type": "text",
"analyzer": "standard"
},
"subject": {
"type": "text",
"analyzer": "standard"
},
"section": {
"type": "text",
"analyzer": "standard"
},
"header_footer_type": {
"type": "keyword"
},
"emphasized_text_contents": {
"type": "text"
},
"emphasized_text_tags": {
"type": "keyword"
},
"text_as_html": {
"type": "text",
"analyzer": "standard"
},
"regex_metadata": {
"type": "object"
},
"detection_class_prob": {
"type": "float"
}
}
}
}
}

View File

@ -0,0 +1,32 @@
Elasticsearch
======================
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to an Elasticsearch index.
First you'll need to install Elasticsearch dependencies as shown here.
.. code:: shell
pip install "unstructured[elasticsearch]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
.. tabs::
.. tab:: Shell
.. literalinclude:: ./code/bash/elasticsearch.sh
:language: bash
.. tab:: Python
.. literalinclude:: ./code/python/elasticsearch.py
:language: python
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> elasticsearch --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Processes a the file from local, chunks, embeds, and writes the results to an Elasticsearch index.
# Structured outputs are stored in local-to-elasticsearch/
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/../../.. || exit 1
# As an example we're using the local connector,
# however ingesting from any supported source connector is possible.
# shellcheck disable=2094
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-to-elasticsearch \
--strategy fast \
--chunk-elements \
--embedding-provider "<an unstructured embedding provider, ie. langchain-huggingface>" \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
elasticsearch \
--hosts "<List of URLs where elasticsearch index is served>" \
--index-name "<Index name to upload data in>" \
--username "<Username to authenticate into the index>" \
--password "<Password to authenticate into the index>" \
--batch-size-bytes "<Size limit for any batch to be uploaded, in bytes, ie. 15000000>" \
--num-processes "<Number of processes to be used to upload, ie. 2>" \
--cloud-id "<Id used to connect to Elastic Cloud>" \
--es-api-key "<Api key used for authentication>" \
--api-key-id "<Id associated with api key used for authentication: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html>" \
--bearer-auth "<Bearer token used for HTTP bearer authentication>" \
--ca-certs "<path/to/ca/certs>" \
--ssl-assert-fingerprint "<SHA256 fingerprint value>"

View File

@ -1,23 +0,0 @@
#!/usr/bin/env bash
# Runs a docker container to create an elasticsearch cluster,
# fills the ES cluster with data,
# processes all the files in the 'movies' index in the cluster using the `unstructured` library.
# Structured outputs are stored in elasticsearch-ingest-output
# shellcheck source=/dev/null
sh scripts/elasticsearch-test-helpers/create-and-check-es.sh
wait
# Kill the container so the script can be repeatedly run using the same ports
trap 'echo "Stopping Elasticsearch Docker container"; docker stop es-test' EXIT
PYTHONPATH=. ./unstructured/ingest/main.py \
elasticsearch \
--metadata-exclude filename,file_directory,metadata.data_source.date_processed \
--url http://localhost:9200 \
--index-name movies \
--jq-query '{ethnicity, director, plot}' \
--output-dir elasticsearch-ingest-output \
--num-processes 2

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Runs a docker container to create an elasticsearch cluster,
# fills the ES cluster with data,
# processes all the files in the 'movies' index in the cluster using the `unstructured` library.
# Structured outputs are stored in elasticsearch-ingest-output
# shellcheck source=/dev/null
sh scripts/elasticsearch-test-helpers/source_connector/create-fill-and-check-es.sh
wait
# Kill the container so the script can be repeatedly run using the same ports
trap 'echo "Stopping Elasticsearch Docker container"; docker stop es-test' EXIT
PYTHONPATH=. ./unstructured/ingest/main.py \
elasticsearch \
--hosts "<List of URLs where elasticsearch index is served>" \
--index-name "<Index name to ingest data from>" \
--username "<Username to authenticate into the index>" \
--password "<Password to authenticate into the index>" \
--fields "<If provided, will limit the fields returned by Elasticsearch to this comma-delimited list" \
--batch-size "<How many records to read at a time per process>" \
--num-processes "<Number of processes to be used to upload, ie. 2>" \
--cloud-id "<Id used to connect to Elastic Cloud>" \
--es-api-key "<Api key used for authentication>" \
--api-key-id "<Id associated with api key used for authentication: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html>" \
--bearer-auth "<Bearer token used for HTTP bearer authentication>" \
--ca-certs "<path/to/ca/certs>" \
--ssl-assert-fingerprint "<SHA256 fingerprint value>"

View File

@ -8,9 +8,10 @@ services:
environment:
- xpack.security.enabled=true
- discovery.type=single-node
- ELASTIC_PASSWORD=DkIedPPSCb
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- ELASTIC_USER=${ELASTIC_USER}
healthcheck:
test: ["CMD-SHELL", "curl --silent --fail -u elastic:DkIedPPSCb localhost:9200/_cluster/health || exit 1"]
test: ["CMD-SHELL", "curl --silent --fail -u ${ELASTIC_USER}:${ELASTIC_PASSWORD} localhost:9200/_cluster/health || exit 1"]
interval: 10s
timeout: 30s
retries: 3

View File

@ -0,0 +1,6 @@
# These credentials are for the ES index within ingest test ES docker image,
# which is stopped immediately after ingest ES destination connector test completes.
# Do not use these credentials for any other purpose than local development, or testing.
# Do not use these credentials for any permanent / long life ES cluster; be it in dev or prod.
export ELASTIC_USER=elastic
export ELASTIC_PASSWORD=Vth0Zd0wxme

View File

@ -1,13 +0,0 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$(dirname "$(realpath "$0")")
# Create the Elasticsearch cluster
docker compose version
docker compose -f "$SCRIPT_DIR"/docker-compose.yaml up --wait
docker compose -f "$SCRIPT_DIR"/docker-compose.yaml ps
echo "Cluster is live."
"$SCRIPT_DIR"/create_and_fill_es.py

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$(dirname "$(dirname "$(realpath "$0")")")
ENV_FILE="$SCRIPT_DIR"/common/es-dest-ingest-test-creds.env
# Create the Elasticsearch cluster
docker compose version
docker compose --env-file "$ENV_FILE" -f "$SCRIPT_DIR"/common/docker-compose.yaml up --wait
docker compose --env-file "$ENV_FILE" -f "$SCRIPT_DIR"/common/docker-compose.yaml ps
echo "Cluster is live."
python "$SCRIPT_DIR"/destination_connector/create_index.py

View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from es_cluster_config import (
CLUSTER_URL,
INDEX_NAME,
PASSWORD,
USER,
mappings,
)
print("Connecting to the Elasticsearch cluster.")
es = Elasticsearch(CLUSTER_URL, basic_auth=(USER, PASSWORD), request_timeout=30)
print(f"{es.info()}")
print("Creating an Elasticsearch index for testing ingest elasticsearch destination connector.")
response = es.options(max_retries=5).indices.create(index=INDEX_NAME, mappings=mappings)
if response.meta.status != 200:
raise RuntimeError("failed to create index")
es.indices.refresh(index=INDEX_NAME)
response = es.cat.count(index=INDEX_NAME, format="json")
print("Succesfully created an Elasticsearch index for testing elasticsearch ingest.")

View File

@ -0,0 +1,11 @@
import json
import os
CLUSTER_URL = "http://localhost:9200"
INDEX_NAME = "ingest-test-destination"
USER = os.environ["ELASTIC_USER"]
PASSWORD = os.environ["ELASTIC_PASSWORD"]
MAPPING_PATH = "docs/source/ingest/destination_connectors/data/elasticsearch_elements_mappings.json"
with open(MAPPING_PATH) as f:
mappings = json.load(f)

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python3
import sys
from elasticsearch import Elasticsearch
from es_cluster_config import (
CLUSTER_URL,
INDEX_NAME,
PASSWORD,
USER,
)
N_ELEMENTS = 1404
if __name__ == "__main__":
print(f"Checking contents of index" f"{INDEX_NAME} at {CLUSTER_URL}")
print("Connecting to the Elasticsearch cluster.")
client = Elasticsearch(CLUSTER_URL, basic_auth=(USER, PASSWORD), request_timeout=30)
print(client.info())
count = int(client.cat.count(index=INDEX_NAME, format="json")[0]["count"])
try:
assert count == N_ELEMENTS
except AssertionError:
sys.exit(
"Elasticsearch dest check failed:"
f"got {count} items in index, expected {N_ELEMENTS} items in index."
)
print(f"Elasticsearch destination test was successful with {count} items being uploaded.")

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR="$(dirname "$(dirname "$(realpath "$0")")")"
ENV_FILE="$SCRIPT_DIR"/common/es-dest-ingest-test-creds.env
# Create the Elasticsearch cluster
docker compose version
docker compose --env-file "$ENV_FILE" -f "$SCRIPT_DIR"/common/docker-compose.yaml up --wait
docker compose --env-file "$ENV_FILE" -f "$SCRIPT_DIR"/common/docker-compose.yaml ps
echo "Cluster is live."
"$SCRIPT_DIR"/source_connector/create_and_fill_es.py

View File

@ -8,12 +8,14 @@ from es_cluster_config import (
DATA_PATH,
INDEX_NAME,
MAPPINGS,
PASSWORD,
USER,
form_elasticsearch_doc_dict,
)
print("Connecting to the Elasticsearch cluster.")
es = Elasticsearch(CLUSTER_URL, basic_auth=("elastic", "DkIedPPSCb"), request_timeout=30)
print(es.info())
es = Elasticsearch(CLUSTER_URL, basic_auth=(USER, PASSWORD), request_timeout=30)
print(f"{es.info()}")
df = pd.read_csv(DATA_PATH).dropna().reset_index()
print("Creating an Elasticsearch index for testing elasticsearch ingest.")

View File

@ -1,6 +1,10 @@
DATA_PATH = "scripts/elasticsearch-test-helpers/wiki_movie_plots_small.csv"
import os
DATA_PATH = "scripts/elasticsearch-test-helpers/source_connector/wiki_movie_plots_small.csv"
CLUSTER_URL = "http://localhost:9200"
INDEX_NAME = "movies"
USER = os.environ["ELASTIC_USER"]
PASSWORD = os.environ["ELASTIC_PASSWORD"]
MAPPINGS = {
"properties": {

View File

@ -0,0 +1,62 @@
#!/usr/bin/env bash
set -e
DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=elasticsearch-dest
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
CI=${CI:-"false"}
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
# shellcheck disable=SC1091
source scripts/elasticsearch-test-helpers/common/es-dest-ingest-test-creds.env
function cleanup {
# Index cleanup
echo "Stopping Elasticsearch Docker container"
docker-compose -f scripts/elasticsearch-test-helpers/common/docker-compose.yaml down --remove-orphans -v
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
if [ "$CI" == "true" ]; then
cleanup_dir "$DOWNLOAD_DIR"
fi
}
trap cleanup EXIT
echo "Creating elasticsearch instance"
# shellcheck source=/dev/null
scripts/elasticsearch-test-helpers/destination_connector/create-elasticsearch-instance.sh
wait
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir "$WORK_DIR" \
--chunk-elements \
--chunk-combine-text-under-n-chars 200 \
--chunk-new-after-n-chars 2500 \
--chunk-max-characters 38000 \
--chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
elasticsearch \
--hosts http://localhost:9200 \
--index-name ingest-test-destination \
--username "$ELASTIC_USER" \
--password "$ELASTIC_PASSWORD" \
--batch-size-bytes 15000000 \
--num-processes "$max_processes"
scripts/elasticsearch-test-helpers/destination_connector/test-ingest-elasticsearch-output.py

View File

@ -16,11 +16,13 @@ CI=${CI:-"false"}
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
# shellcheck disable=SC1091
source scripts/elasticsearch-test-helpers/common/es-dest-ingest-test-creds.env
function cleanup() {
# Kill the container so the script can be repeatedly run using the same ports
echo "Stopping Elasticsearch Docker container"
docker-compose -f scripts/elasticsearch-test-helpers/docker-compose.yaml down --remove-orphans -v
docker-compose -f scripts/elasticsearch-test-helpers/common/docker-compose.yaml down --remove-orphans -v
cleanup_dir "$OUTPUT_DIR"
cleanup_dir "$WORK_DIR"
@ -32,7 +34,7 @@ function cleanup() {
trap cleanup EXIT
# shellcheck source=/dev/null
scripts/elasticsearch-test-helpers/create-and-check-es.sh
scripts/elasticsearch-test-helpers/source_connector/create-fill-and-check-es.sh
wait
RUN_SCRIPT=${RUN_SCRIPT:-./unstructured/ingest/main.py}
@ -47,8 +49,8 @@ PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \
--verbose \
--index-name movies \
--hosts http://localhost:9200 \
--username elastic \
--password DkIedPPSCb \
--username "$ELASTIC_USER" \
--password "$ELASTIC_PASSWORD" \
--fields 'ethnicity,director,plot' \
--work-dir "$WORK_DIR" \
--batch-size 2

View File

@ -21,6 +21,7 @@ all_tests=(
'chroma.sh'
'delta-table.sh'
'dropbox.sh'
'elasticsearch.sh'
'gcs.sh'
'mongodb.sh'
'pinecone.sh'

View File

@ -1 +1 @@
__version__ = "0.11.6-dev3" # pragma: no cover
__version__ = "0.11.6-dev4" # pragma: no cover

View File

@ -14,6 +14,7 @@ from .confluence import get_base_src_cmd as confluence_base_src_cmd
from .delta_table import get_base_dest_cmd as delta_table_dest_cmd
from .delta_table import get_base_src_cmd as delta_table_base_src_cmd
from .discord import get_base_src_cmd as discord_base_src_cmd
from .elasticsearch import get_base_dest_cmd as elasticsearch_base_dest_cmd
from .elasticsearch import get_base_src_cmd as elasticsearch_base_src_cmd
from .fsspec.azure import get_base_dest_cmd as azure_base_dest_cmd
from .fsspec.azure import get_base_src_cmd as azure_base_src_cmd
@ -93,6 +94,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
box_base_dest_cmd,
chroma_base_dest_cmd,
dropbox_base_dest_cmd,
elasticsearch_base_dest_cmd,
fsspec_base_dest_cmd,
gcs_base_dest_cmd,
s3_base_dest_cmd,

View File

@ -5,7 +5,12 @@ import click
from unstructured.ingest.cli.base.src import BaseSrcCmd
from unstructured.ingest.cli.interfaces import CliConfig, DelimitedString
from unstructured.ingest.connector.elasticsearch import SimpleElasticsearchConfig
from unstructured.ingest.connector.elasticsearch import (
ElasticsearchWriteConfig,
SimpleElasticsearchConfig,
)
CMD_NAME = "elasticsearch"
@dataclass
@ -17,7 +22,7 @@ class ElasticsearchCliConfig(SimpleElasticsearchConfig, CliConfig):
["--index-name"],
required=True,
type=str,
help="Name for the Elasticsearch index to pull data from",
help="Name of the Elasticsearch index to pull data from, or upload data to.",
),
click.Option(
["--hosts"],
@ -80,9 +85,49 @@ class ElasticsearchCliConfig(SimpleElasticsearchConfig, CliConfig):
return options
@dataclass
class ElasticsearchCliWriteConfig(ElasticsearchWriteConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--batch-size-bytes"],
required=True,
default=15_000_000,
type=int,
help="Size limit (in bytes) for each batch of items to be uploaded. Check"
" https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html"
"#_how_big_is_too_big for more information.",
),
click.Option(
["--num-processes"],
required=True,
default=2,
type=int,
help="Number of processes to be used while uploading content",
),
]
return options
def get_base_src_cmd() -> BaseSrcCmd:
cmd_cls = BaseSrcCmd(
cmd_name="elasticsearch",
cli_config=ElasticsearchCliConfig,
)
return cmd_cls
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name="elasticsearch",
cli_config=ElasticsearchCliConfig,
additional_cli_options=[ElasticsearchCliWriteConfig],
addition_configs={
"connector_config": SimpleElasticsearchConfig,
"write_config": ElasticsearchCliWriteConfig,
},
)
return cmd_cls

View File

@ -1,23 +1,29 @@
import hashlib
import json
import typing as t
import uuid
from dataclasses import dataclass, field
from itertools import chain
from pathlib import Path
from dataclasses_json.core import Json
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import SourceConnectionError
from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDocBatch,
BaseSingleIngestDoc,
BaseSourceConnector,
IngestDocCleanupMixin,
SourceConnectorCleanupMixin,
SourceMetadata,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import generator_batching_wbytes
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies
@ -73,7 +79,7 @@ class SimpleElasticsearchConfig(BaseConnectorConfig):
index_name: str
batch_size: int = 100
fields: t.List[str] = field(default=list)
fields: t.List[str] = field(default_factory=list)
access_config: ElasticsearchAccessConfig = None
@ -224,7 +230,8 @@ class ElasticsearchIngestDocBatch(BaseIngestDocBatch):
doc_body = doc["_source"]
filename = ingest_doc.filename
flattened_dict = flatten_dict(dictionary=doc_body)
concatenated_values = "\n".join(flattened_dict.values())
str_values = [str(value) for value in flattened_dict.values()]
concatenated_values = "\n".join(str_values)
filename.parent.mkdir(parents=True, exist_ok=True)
with open(filename, "w", encoding="utf8") as f:
@ -299,3 +306,90 @@ class ElasticsearchSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnec
)
for batched_ids in id_batches
]
@dataclass
class ElasticsearchWriteConfig(WriteConfig):
batch_size_bytes: int
num_processes: int
@dataclass
class ElasticsearchDestinationConnector(BaseDestinationConnector):
write_config: ElasticsearchWriteConfig
connector_config: SimpleElasticsearchConfig
_client: t.Optional["Elasticsearch"] = field(init=False, default=None)
@DestinationConnectionError.wrap
@requires_dependencies(["elasticsearch"], extras="elasticsearch")
def generate_client(self) -> "Elasticsearch":
from elasticsearch import Elasticsearch
return Elasticsearch(
**self.connector_config.access_config.to_dict(apply_name_overload=False)
)
@property
def client(self):
if self._client is None:
self._client = self.generate_client()
return self._client
def initialize(self):
_ = self.client
@DestinationConnectionError.wrap
def check_connection(self):
try:
assert self.client.ping()
except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to validate connection: {e}")
@requires_dependencies(["elasticsearch"], extras="elasticsearch")
def write_dict(self, element_dicts: t.List[t.Dict[str, t.Any]]) -> None:
logger.info(
f"writing document batches to destination"
f" index named {self.connector_config.index_name}"
f" at {self.connector_config.access_config.hosts}"
f" with batch size (in bytes) {self.write_config.batch_size_bytes}"
f" with {self.write_config.num_processes} (number of) processes"
)
from elasticsearch.helpers import parallel_bulk
for batch in generator_batching_wbytes(
element_dicts, batch_size_limit_bytes=self.write_config.batch_size_bytes
):
for success, info in parallel_bulk(
self.client, batch, thread_count=self.write_config.num_processes
):
if not success:
logger.error(
"upload failed for a batch in elasticsearch destination connector:", info
)
def conform_dict(self, element_dict):
return {
"_index": self.connector_config.index_name,
"_id": str(uuid.uuid4()),
"_source": {
"element_id": element_dict.pop("element_id", None),
"embeddings": element_dict.pop("embeddings", None),
"text": element_dict.pop("text", None),
"metadata": flatten_dict(
element_dict.pop("metadata", None),
separator="-",
),
},
}
def write(self, docs: t.List[BaseSingleIngestDoc]) -> None:
def generate_element_dicts(doc):
with open(doc._output_filename) as json_file:
element_dicts_one_doc = (
self.conform_dict(element_dict) for element_dict in json.load(json_file)
)
yield from element_dicts_one_doc
# We chain to unite the generators into one generator
self.write_dict(chain(*(generate_element_dicts(doc) for doc in docs)))

View File

@ -4,6 +4,7 @@ from .azure_cognitive_search import AzureCognitiveSearchWriter
from .base_writer import Writer
from .chroma import ChromaWriter
from .delta_table import DeltaTableWriter
from .elasticsearch import ElasticsearchWriter
from .fsspec.azure import AzureWriter
from .fsspec.box import BoxWriter
from .fsspec.dropbox import DropboxWriter
@ -20,6 +21,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = {
"chroma": ChromaWriter,
"delta_table": DeltaTableWriter,
"dropbox": DropboxWriter,
"elasticsearch": ElasticsearchWriter,
"gcs": GcsWriter,
"mongodb": MongodbWriter,
"pinecone": PineconeWriter,

View File

@ -0,0 +1,24 @@
import typing as t
from dataclasses import dataclass
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.ingest.runner.writers.base_writer import Writer
if t.TYPE_CHECKING:
from unstructured.ingest.connector.elasticsearch import (
ElasticsearchWriteConfig,
SimpleElasticsearchConfig,
)
@dataclass
class ElasticsearchWriter(Writer):
connector_config: "SimpleElasticsearchConfig"
write_config: "ElasticsearchWriteConfig"
def get_connector_cls(self) -> BaseDestinationConnector:
from unstructured.ingest.connector.elasticsearch import (
ElasticsearchDestinationConnector,
)
return ElasticsearchDestinationConnector

View File

@ -1,4 +1,5 @@
import itertools
import json
def chunk_generator(iterable, batch_size=100):
@ -8,3 +9,21 @@ def chunk_generator(iterable, batch_size=100):
while chunk:
yield chunk
chunk = tuple(itertools.islice(it, batch_size))
def generator_batching_wbytes(iterable, batch_size_limit_bytes=15_000_000):
"""A helper function to break an iterable into chunks of specified bytes."""
current_batch, current_batch_size = [], 0
for item in iterable:
item_size_bytes = len(json.dumps(item).encode("utf-8"))
if current_batch_size + item_size_bytes <= batch_size_limit_bytes:
current_batch.append(item)
current_batch_size += item_size_bytes
else:
yield current_batch
current_batch, current_batch_size = [item], item_size_bytes
if current_batch:
yield current_batch

View File

@ -480,12 +480,7 @@ def identify_overlapping_case(
type1, type2 = label_pair
text1, text2 = text_pair
ix_element1, ix_element2 = ix_pair
(
overlap_percentage,
max_area,
min_area,
total_area,
) = calculate_overlap_percentage(
(overlap_percentage, max_area, min_area, total_area) = calculate_overlap_percentage(
box1,
box2,
intersection_ratio_method="partial",