diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8fbda89e6..3f16e05c8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -427,6 +427,29 @@ jobs: tesseract --version ./test_unstructured_ingest/test-ingest-dest.sh + test_ingest_help: + environment: ci + strategy: + matrix: + python-version: ["3.9","3.10","3.11", "3.12"] + runs-on: ubuntu-latest + needs: [setup_ingest, lint] + steps: + - uses: 'actions/checkout@v4' + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Setup virtual environment + uses: ./.github/actions/base-ingest-cache + with: + python-version: ${{ matrix.python-version }} + - name: Validate --help + run: | + source .venv/bin/activate + ./test_unstructured_ingest/test-help.sh + + test_unstructured_api_unit: strategy: matrix: diff --git a/test_unstructured_ingest/dest/elasticsearch.sh b/test_unstructured_ingest/dest/elasticsearch.sh index 96cf67804..f30bfeba6 100755 --- a/test_unstructured_ingest/dest/elasticsearch.sh +++ b/test_unstructured_ingest/dest/elasticsearch.sh @@ -57,6 +57,6 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --username "$ELASTIC_USER" \ --password "$ELASTIC_PASSWORD" \ --batch-size-bytes 15000000 \ - --num-processes "$max_processes" + --num-threads "$max_processes" PYTHONPATH=. scripts/elasticsearch-test-helpers/destination_connector/test-ingest-elasticsearch-output.py diff --git a/test_unstructured_ingest/test-help.sh b/test_unstructured_ingest/test-help.sh new file mode 100755 index 000000000..7fb6266c6 --- /dev/null +++ b/test_unstructured_ingest/test-help.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +set -u -o pipefail -e + +RUN_SCRIPT=${RUN_SCRIPT:-./unstructured/ingest/main.py} +sources=$(PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" --help | sed -e '1,/Commands/ d' | awk '{NF=1}1') +first_source=$(echo "$sources" | head -1) +destinations=$(PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" "$first_source" --help | sed -e '1,/Destinations/ d' | awk '{NF=1}1') +echo "Checking all source: $sources" +echo "Checking all destinations: $destinations" +for src in $sources; do + for dest in $destinations; do + echo "Checking $src -> $dest" + PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" "$src" "$dest" --help + done +done diff --git a/unstructured/ingest/cli/cmds/hubspot.py b/unstructured/ingest/cli/cmds/hubspot.py index d6eaf45ce..219973cb7 100644 --- a/unstructured/ingest/cli/cmds/hubspot.py +++ b/unstructured/ingest/cli/cmds/hubspot.py @@ -11,6 +11,8 @@ OBJECT_TYPES = {t.value for t in HubSpotObjectTypes} def validate_custom_property(ctx, param, value) -> t.Dict[str, t.List[str]]: + if not value: + return value for k in value: if k not in OBJECT_TYPES: raise ValueError(f"Invalid object type: {k}, must be one of {OBJECT_TYPES}") diff --git a/unstructured/ingest/v2/cli/cmds/__init__.py b/unstructured/ingest/v2/cli/cmds/__init__.py index 71d3e8c3a..35163adf4 100644 --- a/unstructured/ingest/v2/cli/cmds/__init__.py +++ b/unstructured/ingest/v2/cli/cmds/__init__.py @@ -2,7 +2,7 @@ from collections import Counter import click -from .elasticsearch import elasticsearch_src_cmd +from .elasticsearch import elasticsearch_dest_cmd, elasticsearch_src_cmd from .fsspec.azure import azure_dest_cmd, azure_src_cmd from .fsspec.box import box_dest_cmd, box_src_cmd from .fsspec.dropbox import dropbox_dest_cmd, dropbox_src_cmd @@ -36,6 +36,7 @@ dest_cmds = [ azure_dest_cmd, box_dest_cmd, dropbox_dest_cmd, + elasticsearch_dest_cmd, gcs_dest_cmd, local_dest_cmd, s3_dest_cmd, diff --git a/unstructured/ingest/v2/cli/cmds/elasticsearch.py b/unstructured/ingest/v2/cli/cmds/elasticsearch.py index 16909e0d5..8c52c97f7 100644 --- a/unstructured/ingest/v2/cli/cmds/elasticsearch.py +++ b/unstructured/ingest/v2/cli/cmds/elasticsearch.py @@ -2,7 +2,7 @@ from dataclasses import dataclass import click -from unstructured.ingest.v2.cli.base import SrcCmd +from unstructured.ingest.v2.cli.base import DestCmd, SrcCmd from unstructured.ingest.v2.cli.interfaces import CliConfig from unstructured.ingest.v2.cli.utils import DelimitedString from unstructured.ingest.v2.processes.connectors.elasticsearch import CONNECTOR_TYPE @@ -104,9 +104,56 @@ class ElasticsearchCliIndexerConfig(CliConfig): return options +@dataclass +class ElasticsearchCliUploadStagerConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--index-name"], + required=True, + type=str, + help="Name of the Elasticsearch index to pull data from, or upload data to.", + ), + ] + return options + + +@dataclass +class ElasticsearchUploaderConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--batch-size-bytes"], + required=False, + default=15_000_000, + type=int, + help="Size limit (in bytes) for each batch of items to be uploaded. Check" + " https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html" + "#_how_big_is_too_big for more information.", + ), + click.Option( + ["--num-threads"], + required=False, + default=1, + type=int, + help="Number of threads to be used while uploading content", + ), + ] + return options + + elasticsearch_src_cmd = SrcCmd( cmd_name=CONNECTOR_TYPE, connection_config=ElasticsearchCliConnectionConfig, indexer_config=ElasticsearchCliIndexerConfig, downloader_config=ElasticsearchCliDownloadConfig, ) + +elasticsearch_dest_cmd = DestCmd( + cmd_name=CONNECTOR_TYPE, + connection_config=ElasticsearchCliConnectionConfig, + upload_stager_config=ElasticsearchCliUploadStagerConfig, + uploader_config=ElasticsearchUploaderConfig, +) diff --git a/unstructured/ingest/v2/processes/connectors/elasticsearch.py b/unstructured/ingest/v2/processes/connectors/elasticsearch.py index 494474fbd..e39445d0b 100644 --- a/unstructured/ingest/v2/processes/connectors/elasticsearch.py +++ b/unstructured/ingest/v2/processes/connectors/elasticsearch.py @@ -1,5 +1,7 @@ import hashlib +import json import sys +import uuid from dataclasses import dataclass, field from pathlib import Path from time import time @@ -8,6 +10,7 @@ from typing import TYPE_CHECKING, Any, Generator, Optional from unstructured.documents.elements import DataSourceMetadata from unstructured.ingest.enhanced_dataclass import enhanced_field from unstructured.ingest.error import SourceConnectionNetworkError +from unstructured.ingest.utils.data_prep import generator_batching_wbytes from unstructured.ingest.v2.interfaces import ( AccessConfig, ConnectionConfig, @@ -17,11 +20,18 @@ from unstructured.ingest.v2.interfaces import ( FileData, Indexer, IndexerConfig, + UploadContent, + Uploader, + UploaderConfig, + UploadStager, + UploadStagerConfig, download_responses, ) from unstructured.ingest.v2.logger import logger from unstructured.ingest.v2.processes.connector_registry import ( + DestinationRegistryEntry, SourceRegistryEntry, + add_destination_entry, add_source_entry, ) from unstructured.staging.base import flatten_dict @@ -240,6 +250,88 @@ class ElasticsearchDownloader(Downloader): return download_responses +@dataclass +class ElasticsearchUploadStagerConfig(UploadStagerConfig): + index_name: str + + +@dataclass +class ElasticsearchUploadStager(UploadStager): + upload_stager_config: ElasticsearchUploadStagerConfig + + def conform_dict(self, data: dict) -> dict: + resp = { + "_index": self.upload_stager_config.index_name, + "_id": str(uuid.uuid4()), + "_source": { + "element_id": data.pop("element_id", None), + "embeddings": data.pop("embeddings", None), + "text": data.pop("text", None), + "type": data.pop("type", None), + }, + } + if "metadata" in data and isinstance(data["metadata"], dict): + resp["_source"]["metadata"] = flatten_dict(data["metadata"], separator="-") + return resp + + def run( + self, + elements_filepath: Path, + file_data: FileData, + output_dir: Path, + output_filename: str, + **kwargs: Any, + ) -> Path: + with open(elements_filepath) as elements_file: + elements_contents = json.load(elements_file) + conformed_elements = [self.conform_dict(data=element) for element in elements_contents] + output_path = Path(output_dir) / Path(f"{output_filename}.json") + with open(output_path, "w") as output_file: + json.dump(conformed_elements, output_file) + return output_path + + +@dataclass +class ElasticsearchUploaderConfig(UploaderConfig): + index_name: str + batch_size_bytes: int = 15_000_000 + thread_count: int = 4 + + +@dataclass +class ElasticsearchUploader(Uploader): + upload_config: ElasticsearchUploaderConfig + connection_config: ElasticsearchConnectionConfig + + def run(self, contents: list[UploadContent], **kwargs: Any) -> None: + elements_dict = [] + for content in contents: + with open(content.path) as elements_file: + elements = json.load(elements_file) + elements_dict.extend(elements) + logger.info( + f"writing document batches to destination" + f" index named {self.upload_config.index_name}" + f" at {self.connection_config.hosts}" + f" with batch size (in bytes) {self.upload_config.batch_size_bytes}" + f" with {self.upload_config.thread_count} (number of) threads" + ) + from elasticsearch.helpers import parallel_bulk + + for batch in generator_batching_wbytes( + elements_dict, batch_size_limit_bytes=self.upload_config.batch_size_bytes + ): + for success, info in parallel_bulk( + self.connection_config.get_client(), + batch, + thread_count=self.upload_config.thread_count, + ): + if not success: + logger.error( + "upload failed for a batch in elasticsearch destination connector:", info + ) + + add_source_entry( source_type=CONNECTOR_TYPE, entry=SourceRegistryEntry( @@ -250,3 +342,14 @@ add_source_entry( downloader_config=ElasticsearchDownloaderConfig, ), ) + +add_destination_entry( + destination_type=CONNECTOR_TYPE, + entry=DestinationRegistryEntry( + connection_config=ElasticsearchConnectionConfig, + upload_stager_config=ElasticsearchUploadStagerConfig, + upload_stager=ElasticsearchUploadStager, + uploader_config=ElasticsearchUploaderConfig, + uploader=ElasticsearchUploader, + ), +)