diff --git a/CHANGELOG.md b/CHANGELOG.md index b3bd8e630..4b0f850d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.14.10-dev5 +## 0.14.10-dev6 ### Enhancements diff --git a/test_unstructured_ingest/dest/mongodb.sh b/test_unstructured_ingest/dest/mongodb.sh index 9ff96ff47..aa28090d3 100755 --- a/test_unstructured_ingest/dest/mongodb.sh +++ b/test_unstructured_ingest/dest/mongodb.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash +# shellcheck disable=SC2012 set -e @@ -67,9 +68,10 @@ python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ --collection "$DESTINATION_MONGO_COLLECTION" \ check --expected-records 5 +stage_file=$(ls -1 "$WORK_DIR"/upload_stage | head -n 1) python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ --uri "$MONGODB_URI" \ --database "$MONGODB_DATABASE_NAME" \ --collection "$DESTINATION_MONGO_COLLECTION" \ check-vector \ - --output-json "$OUTPUT_ROOT"/structured-output/$OUTPUT_FOLDER_NAME/fake-memo.pdf.json + --output-json "$WORK_DIR"/upload_stage/"$stage_file" diff --git a/unstructured/__version__.py b/unstructured/__version__.py index b952b69f8..2e85380be 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.14.10-dev5" # pragma: no cover +__version__ = "0.14.10-dev6" # pragma: no cover diff --git a/unstructured/ingest/v2/cli/cmds/__init__.py b/unstructured/ingest/v2/cli/cmds/__init__.py index 3946bdcf8..2d69dd16b 100644 --- a/unstructured/ingest/v2/cli/cmds/__init__.py +++ b/unstructured/ingest/v2/cli/cmds/__init__.py @@ -13,6 +13,7 @@ from .fsspec.s3 import s3_dest_cmd, s3_src_cmd from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd from .google_drive import google_drive_src_cmd from .local import local_dest_cmd, local_src_cmd +from .mongodb import mongodb_dest_cmd from .onedrive import onedrive_drive_src_cmd from .opensearch import opensearch_dest_cmd, opensearch_src_cmd from .pinecone import pinecone_dest_cmd @@ -55,6 +56,7 @@ dest_cmds = [ s3_dest_cmd, sftp_dest_cmd, weaviate_dest_cmd, + mongodb_dest_cmd, ] duplicate_dest_names = [ diff --git a/unstructured/ingest/v2/cli/cmds/mongodb.py b/unstructured/ingest/v2/cli/cmds/mongodb.py new file mode 100644 index 000000000..49ad3e53d --- /dev/null +++ b/unstructured/ingest/v2/cli/cmds/mongodb.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass + +import click + +from unstructured.ingest.v2.cli.base import DestCmd +from unstructured.ingest.v2.cli.interfaces import CliConfig +from unstructured.ingest.v2.processes.connectors.mongodb import CONNECTOR_TYPE + + +@dataclass +class MongoDBCliConnectionConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--uri"], + help="URI to user when connecting", + ), + click.Option( + ["--host"], + help="hostname or IP address or Unix domain socket path of a single mongod or " + "mongos instance to connect to, or a list of hostnames", + ), + click.Option(["--port"], type=int, default=27017), + click.Option( + ["--database"], type=str, required=True, help="database name to connect to" + ), + click.Option( + ["--collection"], required=True, type=str, help="collection name to connect to" + ), + ] + return options + + +@dataclass +class MongoDBCliUploaderConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--batch-size"], + default=100, + type=int, + help="Number of records per batch", + ) + ] + return options + + +@dataclass +class MongoDBCliUploadStagerConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + return [] + + +mongodb_dest_cmd = DestCmd( + cmd_name=CONNECTOR_TYPE, + connection_config=MongoDBCliConnectionConfig, + uploader_config=MongoDBCliUploaderConfig, + upload_stager_config=MongoDBCliUploadStagerConfig, +) diff --git a/unstructured/ingest/v2/examples/example_mongodb.py b/unstructured/ingest/v2/examples/example_mongodb.py new file mode 100644 index 000000000..4ef562ae6 --- /dev/null +++ b/unstructured/ingest/v2/examples/example_mongodb.py @@ -0,0 +1,52 @@ +import random +from pathlib import Path + +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.chunker import ChunkerConfig +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured.ingest.v2.processes.connectors.mongodb import ( + MongoDBAccessConfig, + MongoDBConnectionConfig, + MongoDBUploaderConfig, + MongoDBUploadStagerConfig, +) +from unstructured.ingest.v2.processes.embedder import EmbedderConfig +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +base_path = Path(__file__).parent.parent.parent.parent.parent +docs_path = base_path / "example-docs" +work_dir = base_path / "tmp_ingest" +output_path = work_dir / "output" +download_path = work_dir / "download" + +if __name__ == "__main__": + logger.info(f"Writing all content in: {work_dir.resolve()}") + Pipeline.from_configs( + context=ProcessorConfig(work_dir=str(work_dir.resolve())), + indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"), + downloader_config=LocalDownloaderConfig(download_dir=download_path), + source_connection_config=LocalConnectionConfig(), + partitioner_config=PartitionerConfig(strategy="fast"), + chunker_config=ChunkerConfig( + chunking_strategy="by_title", + chunk_include_orig_elements=False, + chunk_max_characters=1500, + chunk_multipage_sections=True, + ), + embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"), + destination_connection_config=MongoDBConnectionConfig( + access_config=MongoDBAccessConfig(uri=None), + host="localhost", + port=27017, + collection=f"test-collection-{random.randint(1000,9999)}", + database="testDatabase", + ), + stager_config=MongoDBUploadStagerConfig(), + uploader_config=MongoDBUploaderConfig(batch_size=10), + ).run() diff --git a/unstructured/ingest/v2/processes/connectors/mongodb.py b/unstructured/ingest/v2/processes/connectors/mongodb.py new file mode 100644 index 000000000..cbf7ddfe1 --- /dev/null +++ b/unstructured/ingest/v2/processes/connectors/mongodb.py @@ -0,0 +1,141 @@ +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional + +from unstructured.__version__ import __version__ as unstructured_version +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + FileData, + UploadContent, + Uploader, + UploaderConfig, + UploadStager, + UploadStagerConfig, +) +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.processes.connector_registry import ( + DestinationRegistryEntry, + add_destination_entry, +) +from unstructured.utils import requires_dependencies + +if TYPE_CHECKING: + from pymongo import MongoClient + +CONNECTOR_TYPE = "mongodb" +SERVER_API_VERSION = "1" + + +@dataclass +class MongoDBAccessConfig(AccessConfig): + uri: Optional[str] = None + + +@dataclass +class MongoDBConnectionConfig(ConnectionConfig): + access_config: MongoDBAccessConfig = enhanced_field( + sensitive=True, default_factory=MongoDBAccessConfig + ) + host: Optional[str] = None + database: Optional[str] = None + collection: Optional[str] = None + port: int = 27017 + batch_size: int = 100 + connector_type: str = CONNECTOR_TYPE + + +@dataclass +class MongoDBUploadStagerConfig(UploadStagerConfig): + pass + + +@dataclass +class MongoDBUploadStager(UploadStager): + upload_stager_config: MongoDBUploadStagerConfig = field( + default_factory=lambda: MongoDBUploadStagerConfig() + ) + + def run( + self, + elements_filepath: Path, + file_data: FileData, + output_dir: Path, + output_filename: str, + **kwargs: Any, + ) -> Path: + with open(elements_filepath) as elements_file: + elements_contents = json.load(elements_file) + + output_path = Path(output_dir) / Path(f"{output_filename}.json") + with open(output_path, "w") as output_file: + json.dump(elements_contents, output_file) + return output_path + + +@dataclass +class MongoDBUploaderConfig(UploaderConfig): + batch_size: int = 100 + + +@dataclass +class MongoDBUploader(Uploader): + upload_config: MongoDBUploaderConfig + connection_config: MongoDBConnectionConfig + client: Optional["MongoClient"] = field(init=False) + connector_type: str = CONNECTOR_TYPE + + def __post_init__(self): + self.client = self.create_client() + + @requires_dependencies(["pymongo"], extras="mongodb") + def create_client(self) -> "MongoClient": + from pymongo import MongoClient + from pymongo.driver_info import DriverInfo + from pymongo.server_api import ServerApi + + if self.connection_config.access_config.uri: + return MongoClient( + self.connection_config.access_config.uri, + server_api=ServerApi(version=SERVER_API_VERSION), + driver=DriverInfo(name="unstructured", version=unstructured_version), + ) + else: + return MongoClient( + host=self.connection_config.host, + port=self.connection_config.port, + server_api=ServerApi(version=SERVER_API_VERSION), + ) + + def run(self, contents: list[UploadContent], **kwargs: Any) -> None: + elements_dict = [] + for content in contents: + with open(content.path) as elements_file: + elements = json.load(elements_file) + elements_dict.extend(elements) + + logger.info( + f"writing {len(elements_dict)} objects to destination " + f"db, {self.connection_config.database}, " + f"collection {self.connection_config.collection} " + f"at {self.connection_config.host}", + ) + db = self.client[self.connection_config.database] + collection = db[self.connection_config.collection] + for chunk in chunk_generator(elements_dict, self.upload_config.batch_size): + collection.insert_many(chunk) + + +add_destination_entry( + destination_type=CONNECTOR_TYPE, + entry=DestinationRegistryEntry( + connection_config=MongoDBConnectionConfig, + uploader=MongoDBUploader, + uploader_config=MongoDBUploaderConfig, + upload_stager=MongoDBUploadStager, + upload_stager_config=MongoDBUploadStagerConfig, + ), +)