diff --git a/CHANGELOG.md b/CHANGELOG.md index ee26af6f2..6ea3bf18d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.14.10-dev0 + +### Enhancements + +### Features + +### Fixes + ## 0.14.9 ### Enhancements diff --git a/scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py b/scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py index 98c5bf4d6..a03706a80 100755 --- a/scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py +++ b/scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py @@ -54,7 +54,6 @@ if __name__ == "__main__": count = client.count(index="ingest-test-destination")["count"] - assert int(count) == N_ELEMENTS, "OpenSearch dest check failed:" - f"got {count} items in index, expected {N_ELEMENTS} items in index." + assert int(count) == N_ELEMENTS, f"OpenSearch dst check fail: expect {N_ELEMENTS} got {count}" print(f"OpenSearch destination test was successful with {count} items being uploaded.") diff --git a/test_unstructured_ingest/dest/opensearch.sh b/test_unstructured_ingest/dest/opensearch.sh index 677cadd72..db64f3ff3 100755 --- a/test_unstructured_ingest/dest/opensearch.sh +++ b/test_unstructured_ingest/dest/opensearch.sh @@ -51,6 +51,6 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --password "admin" \ --use-ssl \ --batch-size-bytes 150 \ - --num-processes "$max_processes" + --num-threads "$max_processes" scripts/opensearch-test-helpers/destination_connector/test-ingest-opensearch-output.py diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 9a5de316e..abe782aca 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.14.9" # pragma: no cover +__version__ = "0.14.10-dev0" # pragma: no cover diff --git a/unstructured/ingest/v2/cli/cmds/__init__.py b/unstructured/ingest/v2/cli/cmds/__init__.py index 66f96b67b..12d18aa47 100644 --- a/unstructured/ingest/v2/cli/cmds/__init__.py +++ b/unstructured/ingest/v2/cli/cmds/__init__.py @@ -14,6 +14,7 @@ from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd from .google_drive import google_drive_src_cmd from .local import local_dest_cmd, local_src_cmd from .onedrive import onedrive_drive_src_cmd +from .opensearch import opensearch_dest_cmd from .weaviate import weaviate_dest_cmd src_cmds = [ @@ -47,6 +48,7 @@ dest_cmds = [ elasticsearch_dest_cmd, gcs_dest_cmd, local_dest_cmd, + opensearch_dest_cmd, s3_dest_cmd, sftp_dest_cmd, weaviate_dest_cmd, diff --git a/unstructured/ingest/v2/cli/cmds/opensearch.py b/unstructured/ingest/v2/cli/cmds/opensearch.py new file mode 100644 index 000000000..06e82b348 --- /dev/null +++ b/unstructured/ingest/v2/cli/cmds/opensearch.py @@ -0,0 +1,84 @@ +from dataclasses import dataclass + +import click + +from unstructured.ingest.v2.cli.base import DestCmd +from unstructured.ingest.v2.cli.cmds.elasticsearch import ( + ElasticsearchCliUploadStagerConfig, + ElasticsearchUploaderConfig, +) +from unstructured.ingest.v2.cli.interfaces import CliConfig +from unstructured.ingest.v2.cli.utils import DelimitedString +from unstructured.ingest.v2.processes.connectors.opensearch import CONNECTOR_TYPE + + +@dataclass +class OpenSearchCliConnectionConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--hosts"], + type=DelimitedString(), + help='List of the OpenSearch hosts to connect to, e.g. "http://localhost:9200"', + ), + click.Option( + ["--username"], type=str, default=None, help="username when using basic auth" + ), + click.Option( + ["--password"], + type=str, + default=None, + help="password when using basic auth", + ), + click.Option( + ["--use-ssl"], + type=bool, + default=False, + is_flag=True, + help="use ssl for the connection", + ), + click.Option( + ["--verify-certs"], + type=bool, + default=False, + is_flag=True, + help="whether to verify SSL certificates", + ), + click.Option( + ["--ssl-show-warn"], + type=bool, + default=False, + is_flag=True, + help="show warning when verify certs is disabled", + ), + click.Option( + ["--ca-certs"], + type=click.Path(), + default=None, + help="path to CA bundle", + ), + click.Option( + ["--client-cert"], + type=click.Path(), + default=None, + help="path to the file containing the private key and the certificate," + " or cert only if using client_key", + ), + click.Option( + ["--client-key"], + type=click.Path(), + default=None, + help="path to the file containing the private key" + " if using separate cert and key files", + ), + ] + return options + + +opensearch_dest_cmd = DestCmd( + cmd_name=CONNECTOR_TYPE, + connection_config=OpenSearchCliConnectionConfig, + upload_stager_config=ElasticsearchCliUploadStagerConfig, + uploader_config=ElasticsearchUploaderConfig, +) diff --git a/unstructured/ingest/v2/examples/example_opensearch.py b/unstructured/ingest/v2/examples/example_opensearch.py new file mode 100644 index 000000000..a5f654cfe --- /dev/null +++ b/unstructured/ingest/v2/examples/example_opensearch.py @@ -0,0 +1,51 @@ +from pathlib import Path + +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.chunker import ChunkerConfig +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured.ingest.v2.processes.connectors.opensearch import ( + OpenSearchAccessConfig, + OpenSearchConnectionConfig, + OpenSearchUploaderConfig, + OpenSearchUploadStagerConfig, +) +from unstructured.ingest.v2.processes.embedder import EmbedderConfig +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +base_path = Path(__file__).parent.parent.parent.parent.parent +docs_path = base_path / "example-docs" +work_dir = base_path / "tmp_ingest" +output_path = work_dir / "output" +download_path = work_dir / "download" + +if __name__ == "__main__": + logger.info(f"Writing all content in: {work_dir.resolve()}") + Pipeline.from_configs( + context=ProcessorConfig(work_dir=str(work_dir.resolve())), + indexer_config=LocalIndexerConfig( + input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt" + ), + downloader_config=LocalDownloaderConfig(download_dir=download_path), + source_connection_config=LocalConnectionConfig(), + partitioner_config=PartitionerConfig(strategy="fast"), + chunker_config=ChunkerConfig(chunking_strategy="by_title"), + embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"), + destination_connection_config=OpenSearchConnectionConfig( + hosts="http://localhost:9247", + username="admin", + access_config=OpenSearchAccessConfig( + password="admin", + use_ssl=True, + ), + ), + stager_config=OpenSearchUploadStagerConfig(index_name="ingest-test-destination"), + uploader_config=OpenSearchUploaderConfig( + index_name="ingest-test-destination", batch_size_bytes=150 + ), + ).run() diff --git a/unstructured/ingest/v2/processes/connectors/elasticsearch.py b/unstructured/ingest/v2/processes/connectors/elasticsearch.py index 50b74677c..24ca1236b 100644 --- a/unstructured/ingest/v2/processes/connectors/elasticsearch.py +++ b/unstructured/ingest/v2/processes/connectors/elasticsearch.py @@ -53,10 +53,10 @@ class ElasticsearchAccessConfig(AccessConfig): @dataclass class ElasticsearchClientInput(EnhancedDataClassJsonMixin): - hosts: Optional[str] = None + hosts: Optional[list[str]] = None cloud_id: Optional[str] = None ca_certs: Optional[str] = None - basic_auth: Optional[tuple[str, str]] = None + basic_auth: Optional[tuple[str, str]] = enhanced_field(sensitive=True, default=None) api_key: Optional[str] = enhanced_field(sensitive=True, default=None) @@ -322,7 +322,7 @@ class ElasticsearchUploadStager(UploadStager): class ElasticsearchUploaderConfig(UploaderConfig): index_name: str batch_size_bytes: int = 15_000_000 - thread_count: int = 4 + num_threads: int = 4 @dataclass @@ -331,7 +331,14 @@ class ElasticsearchUploader(Uploader): upload_config: ElasticsearchUploaderConfig connection_config: ElasticsearchConnectionConfig + @requires_dependencies(["elasticsearch"], extras="elasticsearch") + def load_parallel_bulk(self): + from elasticsearch.helpers import parallel_bulk + + return parallel_bulk + def run(self, contents: list[UploadContent], **kwargs: Any) -> None: + parallel_bulk = self.load_parallel_bulk() elements_dict = [] for content in contents: with open(content.path) as elements_file: @@ -342,14 +349,13 @@ class ElasticsearchUploader(Uploader): f"writing {len(elements_dict)} elements via document batches to destination " f"index named {self.upload_config.index_name} at {upload_destination} with " f"batch size (in bytes) {self.upload_config.batch_size_bytes} with " - f"{self.upload_config.thread_count} (number of) threads" + f"{self.upload_config.num_threads} (number of) threads" ) - from elasticsearch.helpers import parallel_bulk client = self.connection_config.get_client() if not client.indices.exists(index=self.upload_config.index_name): logger.warning( - f"Elasticsearch index does not exist: " + f"{(self.__class__.__name__).replace('Uploader', '')} index does not exist: " f"{self.upload_config.index_name}. " f"This may cause issues when uploading." ) @@ -359,11 +365,14 @@ class ElasticsearchUploader(Uploader): for success, info in parallel_bulk( client=client, actions=batch, - thread_count=self.upload_config.thread_count, + thread_count=self.upload_config.num_threads, ): if not success: logger.error( - "upload failed for a batch in elasticsearch destination connector:", info + "upload failed for a batch in " + f"{(self.__class__.__name__).replace('Uploader', '')} " + "destination connector:", + info, ) diff --git a/unstructured/ingest/v2/processes/connectors/opensearch.py b/unstructured/ingest/v2/processes/connectors/opensearch.py new file mode 100644 index 000000000..f331d499c --- /dev/null +++ b/unstructured/ingest/v2/processes/connectors/opensearch.py @@ -0,0 +1,118 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + +from unstructured.ingest.enhanced_dataclass import EnhancedDataClassJsonMixin, enhanced_field +from unstructured.ingest.error import DestinationConnectionError +from unstructured.ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, +) +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.processes.connector_registry import ( + DestinationRegistryEntry, + add_destination_entry, +) +from unstructured.ingest.v2.processes.connectors.elasticsearch import ( + ElasticsearchUploader, + ElasticsearchUploaderConfig, + ElasticsearchUploadStager, + ElasticsearchUploadStagerConfig, +) +from unstructured.utils import requires_dependencies + +if TYPE_CHECKING: + from opensearchpy import OpenSearch + +CONNECTOR_TYPE = "opensearch" + +"""Since the actual OpenSearch project is a fork of Elasticsearch, we are relying +heavily on the Elasticsearch connector code, inheriting the functionality as much as possible.""" + + +@dataclass +class OpenSearchAccessConfig(AccessConfig): + password: Optional[str] = enhanced_field(default=None, sensitive=True) + use_ssl: bool = False + verify_certs: bool = False + ssl_show_warn: bool = False + ca_certs: Optional[str] = None + client_cert: Optional[str] = None + client_key: Optional[str] = None + + +@dataclass +class OpenSearchClientInput(EnhancedDataClassJsonMixin): + http_auth: Optional[tuple[str, str]] = enhanced_field(sensitive=True, default=None) + hosts: Optional[list[str]] = None + use_ssl: bool = False + verify_certs: bool = False + ssl_show_warn: bool = False + ca_certs: Optional[str] = None + client_cert: Optional[str] = None + client_key: Optional[str] = None + + +@dataclass +class OpenSearchConnectionConfig(ConnectionConfig): + hosts: Optional[list[str]] = None + username: Optional[str] = None + access_config: OpenSearchAccessConfig = enhanced_field(sensitive=True) + + def get_client_kwargs(self) -> dict: + # Update auth related fields to conform to what the SDK expects based on the + # supported methods: + # https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/client/__init__.py + client_input = OpenSearchClientInput() + if self.hosts: + client_input.hosts = self.hosts + if self.access_config.use_ssl: + client_input.use_ssl = self.access_config.use_ssl + if self.access_config.verify_certs: + client_input.verify_certs = self.access_config.verify_certs + if self.access_config.ssl_show_warn: + client_input.ssl_show_warn = self.access_config.ssl_show_warn + if self.access_config.ca_certs: + client_input.ca_certs = self.access_config.ca_certs + if self.access_config.client_cert: + client_input.client_cert = self.access_config.client_cert + if self.access_config.client_key: + client_input.client_key = self.access_config.client_key + if self.username and self.access_config.password: + client_input.http_auth = (self.username, self.access_config.password) + logger.debug( + f"OpenSearch client inputs mapped to: {client_input.to_dict(redact_sensitive=True)}" + ) + client_kwargs = client_input.to_dict(redact_sensitive=False) + client_kwargs = {k: v for k, v in client_kwargs.items() if v is not None} + return client_kwargs + + @DestinationConnectionError.wrap + @requires_dependencies(["opensearchpy"], extras="opensearch") + def get_client(self) -> "OpenSearch": + from opensearchpy import OpenSearch + + return OpenSearch(**self.get_client_kwargs()) + + +@dataclass +class OpenSearchUploader(ElasticsearchUploader): + connection_config: OpenSearchConnectionConfig + connector_type: str = CONNECTOR_TYPE + + @requires_dependencies(["opensearchpy"], extras="opensearch") + def load_parallel_bulk(self): + from opensearchpy.helpers import parallel_bulk + + return parallel_bulk + + +add_destination_entry( + destination_type=CONNECTOR_TYPE, + entry=DestinationRegistryEntry( + connection_config=OpenSearchConnectionConfig, + upload_stager_config=ElasticsearchUploadStagerConfig, + upload_stager=ElasticsearchUploadStager, + uploader_config=ElasticsearchUploaderConfig, + uploader=OpenSearchUploader, + ), +)