rfct [P6M]-392: OpenSearch V2 Destination Connector (#3293)

Migrates OpenSearch destination connector to V2. Relies a lot on the
Elasticsearch connector where possible. (this is expected)
This commit is contained in:
David Potter 2024-06-28 13:51:23 -07:00 committed by GitHub
parent 4a71bbb44c
commit 15f80c4ad6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 283 additions and 12 deletions

View File

@ -1,3 +1,11 @@
## 0.14.10-dev0
### Enhancements
### Features
### Fixes
## 0.14.9
### Enhancements

View File

@ -54,7 +54,6 @@ if __name__ == "__main__":
count = client.count(index="ingest-test-destination")["count"]
assert int(count) == N_ELEMENTS, "OpenSearch dest check failed:"
f"got {count} items in index, expected {N_ELEMENTS} items in index."
assert int(count) == N_ELEMENTS, f"OpenSearch dst check fail: expect {N_ELEMENTS} got {count}"
print(f"OpenSearch destination test was successful with {count} items being uploaded.")

View File

@ -51,6 +51,6 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--password "admin" \
--use-ssl \
--batch-size-bytes 150 \
--num-processes "$max_processes"
--num-threads "$max_processes"
scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py

View File

@ -1 +1 @@
__version__ = "0.14.9" # pragma: no cover
__version__ = "0.14.10-dev0" # pragma: no cover

View File

@ -14,6 +14,7 @@ from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd
from .google_drive import google_drive_src_cmd
from .local import local_dest_cmd, local_src_cmd
from .onedrive import onedrive_drive_src_cmd
from .opensearch import opensearch_dest_cmd
from .weaviate import weaviate_dest_cmd
src_cmds = [
@ -47,6 +48,7 @@ dest_cmds = [
elasticsearch_dest_cmd,
gcs_dest_cmd,
local_dest_cmd,
opensearch_dest_cmd,
s3_dest_cmd,
sftp_dest_cmd,
weaviate_dest_cmd,

View File

@ -0,0 +1,84 @@
from dataclasses import dataclass
import click
from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.cmds.elasticsearch import (
ElasticsearchCliUploadStagerConfig,
ElasticsearchUploaderConfig,
)
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.cli.utils import DelimitedString
from unstructured.ingest.v2.processes.connectors.opensearch import CONNECTOR_TYPE
@dataclass
class OpenSearchCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--hosts"],
type=DelimitedString(),
help='List of the OpenSearch hosts to connect to, e.g. "http://localhost:9200"',
),
click.Option(
["--username"], type=str, default=None, help="username when using basic auth"
),
click.Option(
["--password"],
type=str,
default=None,
help="password when using basic auth",
),
click.Option(
["--use-ssl"],
type=bool,
default=False,
is_flag=True,
help="use ssl for the connection",
),
click.Option(
["--verify-certs"],
type=bool,
default=False,
is_flag=True,
help="whether to verify SSL certificates",
),
click.Option(
["--ssl-show-warn"],
type=bool,
default=False,
is_flag=True,
help="show warning when verify certs is disabled",
),
click.Option(
["--ca-certs"],
type=click.Path(),
default=None,
help="path to CA bundle",
),
click.Option(
["--client-cert"],
type=click.Path(),
default=None,
help="path to the file containing the private key and the certificate,"
" or cert only if using client_key",
),
click.Option(
["--client-key"],
type=click.Path(),
default=None,
help="path to the file containing the private key"
" if using separate cert and key files",
),
]
return options
opensearch_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=OpenSearchCliConnectionConfig,
upload_stager_config=ElasticsearchCliUploadStagerConfig,
uploader_config=ElasticsearchUploaderConfig,
)

View File

@ -0,0 +1,51 @@
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.opensearch import (
OpenSearchAccessConfig,
OpenSearchConnectionConfig,
OpenSearchUploaderConfig,
OpenSearchUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(
input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt"
),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=OpenSearchConnectionConfig(
hosts="http://localhost:9247",
username="admin",
access_config=OpenSearchAccessConfig(
password="admin",
use_ssl=True,
),
),
stager_config=OpenSearchUploadStagerConfig(index_name="ingest-test-destination"),
uploader_config=OpenSearchUploaderConfig(
index_name="ingest-test-destination", batch_size_bytes=150
),
).run()

View File

@ -53,10 +53,10 @@ class ElasticsearchAccessConfig(AccessConfig):
@dataclass
class ElasticsearchClientInput(EnhancedDataClassJsonMixin):
hosts: Optional[str] = None
hosts: Optional[list[str]] = None
cloud_id: Optional[str] = None
ca_certs: Optional[str] = None
basic_auth: Optional[tuple[str, str]] = None
basic_auth: Optional[tuple[str, str]] = enhanced_field(sensitive=True, default=None)
api_key: Optional[str] = enhanced_field(sensitive=True, default=None)
@ -322,7 +322,7 @@ class ElasticsearchUploadStager(UploadStager):
class ElasticsearchUploaderConfig(UploaderConfig):
index_name: str
batch_size_bytes: int = 15_000_000
thread_count: int = 4
num_threads: int = 4
@dataclass
@ -331,7 +331,14 @@ class ElasticsearchUploader(Uploader):
upload_config: ElasticsearchUploaderConfig
connection_config: ElasticsearchConnectionConfig
@requires_dependencies(["elasticsearch"], extras="elasticsearch")
def load_parallel_bulk(self):
from elasticsearch.helpers import parallel_bulk
return parallel_bulk
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
parallel_bulk = self.load_parallel_bulk()
elements_dict = []
for content in contents:
with open(content.path) as elements_file:
@ -342,14 +349,13 @@ class ElasticsearchUploader(Uploader):
f"writing {len(elements_dict)} elements via document batches to destination "
f"index named {self.upload_config.index_name} at {upload_destination} with "
f"batch size (in bytes) {self.upload_config.batch_size_bytes} with "
f"{self.upload_config.thread_count} (number of) threads"
f"{self.upload_config.num_threads} (number of) threads"
)
from elasticsearch.helpers import parallel_bulk
client = self.connection_config.get_client()
if not client.indices.exists(index=self.upload_config.index_name):
logger.warning(
f"Elasticsearch index does not exist: "
f"{(self.__class__.__name__).replace('Uploader', '')} index does not exist: "
f"{self.upload_config.index_name}. "
f"This may cause issues when uploading."
)
@ -359,11 +365,14 @@ class ElasticsearchUploader(Uploader):
for success, info in parallel_bulk(
client=client,
actions=batch,
thread_count=self.upload_config.thread_count,
thread_count=self.upload_config.num_threads,
):
if not success:
logger.error(
"upload failed for a batch in elasticsearch destination connector:", info
"upload failed for a batch in "
f"{(self.__class__.__name__).replace('Uploader', '')} "
"destination connector:",
info,
)

View File

@ -0,0 +1,118 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
from unstructured.ingest.enhanced_dataclass import EnhancedDataClassJsonMixin, enhanced_field
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
)
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
add_destination_entry,
)
from unstructured.ingest.v2.processes.connectors.elasticsearch import (
ElasticsearchUploader,
ElasticsearchUploaderConfig,
ElasticsearchUploadStager,
ElasticsearchUploadStagerConfig,
)
from unstructured.utils import requires_dependencies
if TYPE_CHECKING:
from opensearchpy import OpenSearch
CONNECTOR_TYPE = "opensearch"
"""Since the actual OpenSearch project is a fork of Elasticsearch, we are relying
heavily on the Elasticsearch connector code, inheriting the functionality as much as possible."""
@dataclass
class OpenSearchAccessConfig(AccessConfig):
password: Optional[str] = enhanced_field(default=None, sensitive=True)
use_ssl: bool = False
verify_certs: bool = False
ssl_show_warn: bool = False
ca_certs: Optional[str] = None
client_cert: Optional[str] = None
client_key: Optional[str] = None
@dataclass
class OpenSearchClientInput(EnhancedDataClassJsonMixin):
http_auth: Optional[tuple[str, str]] = enhanced_field(sensitive=True, default=None)
hosts: Optional[list[str]] = None
use_ssl: bool = False
verify_certs: bool = False
ssl_show_warn: bool = False
ca_certs: Optional[str] = None
client_cert: Optional[str] = None
client_key: Optional[str] = None
@dataclass
class OpenSearchConnectionConfig(ConnectionConfig):
hosts: Optional[list[str]] = None
username: Optional[str] = None
access_config: OpenSearchAccessConfig = enhanced_field(sensitive=True)
def get_client_kwargs(self) -> dict:
# Update auth related fields to conform to what the SDK expects based on the
# supported methods:
# https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/client/__init__.py
client_input = OpenSearchClientInput()
if self.hosts:
client_input.hosts = self.hosts
if self.access_config.use_ssl:
client_input.use_ssl = self.access_config.use_ssl
if self.access_config.verify_certs:
client_input.verify_certs = self.access_config.verify_certs
if self.access_config.ssl_show_warn:
client_input.ssl_show_warn = self.access_config.ssl_show_warn
if self.access_config.ca_certs:
client_input.ca_certs = self.access_config.ca_certs
if self.access_config.client_cert:
client_input.client_cert = self.access_config.client_cert
if self.access_config.client_key:
client_input.client_key = self.access_config.client_key
if self.username and self.access_config.password:
client_input.http_auth = (self.username, self.access_config.password)
logger.debug(
f"OpenSearch client inputs mapped to: {client_input.to_dict(redact_sensitive=True)}"
)
client_kwargs = client_input.to_dict(redact_sensitive=False)
client_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
return client_kwargs
@DestinationConnectionError.wrap
@requires_dependencies(["opensearchpy"], extras="opensearch")
def get_client(self) -> "OpenSearch":
from opensearchpy import OpenSearch
return OpenSearch(**self.get_client_kwargs())
@dataclass
class OpenSearchUploader(ElasticsearchUploader):
connection_config: OpenSearchConnectionConfig
connector_type: str = CONNECTOR_TYPE
@requires_dependencies(["opensearchpy"], extras="opensearch")
def load_parallel_bulk(self):
from opensearchpy.helpers import parallel_bulk
return parallel_bulk
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
connection_config=OpenSearchConnectionConfig,
upload_stager_config=ElasticsearchUploadStagerConfig,
upload_stager=ElasticsearchUploadStager,
uploader_config=ElasticsearchUploaderConfig,
uploader=OpenSearchUploader,
),
)