David Potter bc791d53f4
feat: add opensearch source and destination connector (#2349)
Adds OpenSearch as a source and destination.

Since OpenSearch is a fork of Elasticsearch, these connectors rely
heavily on inheriting the Elasticsearch connectors whenever possible.

- Adds OpenSearch source connector to be able to ingest documents from
OpenSearch.
- Adds OpenSearch destination connector to be able to ingest documents
from any supported source, embed them and write the embeddings /
documents into OpenSearch.
- Defines an example unstructured elements schema for users to be able
to setup their unstructured OpenSearch indexes easily.

---------

Co-authored-by: potter-potter <david.potter@gmail.com>
2024-01-17 04:31:49 +00:00

63 lines
1.8 KiB
Python

import os
from unstructured.ingest.connector.elasticsearch import (
ElasticsearchWriteConfig,
)
from unstructured.ingest.connector.local import SimpleLocalConfig
from unstructured.ingest.connector.opensearch import (
OpenSearchAccessConfig,
SimpleOpenSearchConfig,
)
from unstructured.ingest.interfaces import (
ChunkingConfig,
EmbeddingConfig,
PartitionConfig,
ProcessorConfig,
ReadConfig,
)
from unstructured.ingest.runner import LocalRunner
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.opensearch import (
OpenSearchWriter,
)
def get_writer() -> Writer:
return OpenSearchWriter(
connector_config=SimpleOpenSearchConfig(
access_config=OpenSearchAccessConfig(
hosts=os.getenv("OPENSEARCH_HOSTS"),
username=os.getenv("OPENSEARCH_USERNAME"),
password=os.getenv("OPENSEARCH_PASSWORD"),
),
index_name=os.getenv("OPENSEARCH_INDEX_NAME"),
),
write_config=ElasticsearchWriteConfig(
batch_size_bytes=15_000_000,
num_processes=2,
),
)
if __name__ == "__main__":
writer = get_writer()
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-opensearch",
num_processes=2,
),
connector_config=SimpleLocalConfig(
input_path="example-docs/book-war-and-peace-1225p.txt",
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(chunk_elements=True),
embedding_config=EmbeddingConfig(
provider="langchain-huggingface",
),
writer=writer,
writer_kwargs={},
)
runner.run()