rfctr: implement mongodb v2 destination connector (#3313)

This PR provides support for V2 mongodb destination connector.
This commit is contained in:
Nathan Van Gheem 2024-07-02 11:40:51 -05:00 committed by GitHub
parent c28deffbc4
commit da29242dbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 262 additions and 3 deletions

View File

@ -1,4 +1,4 @@
## 0.14.10-dev5
## 0.14.10-dev6
### Enhancements

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# shellcheck disable=SC2012
set -e
@ -67,9 +68,10 @@ python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \
--collection "$DESTINATION_MONGO_COLLECTION" \
check --expected-records 5
stage_file=$(ls -1 "$WORK_DIR"/upload_stage | head -n 1)
python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \
--uri "$MONGODB_URI" \
--database "$MONGODB_DATABASE_NAME" \
--collection "$DESTINATION_MONGO_COLLECTION" \
check-vector \
--output-json "$OUTPUT_ROOT"/structured-output/$OUTPUT_FOLDER_NAME/fake-memo.pdf.json
--output-json "$WORK_DIR"/upload_stage/"$stage_file"

View File

@ -1 +1 @@
__version__ = "0.14.10-dev5" # pragma: no cover
__version__ = "0.14.10-dev6" # pragma: no cover

View File

@ -13,6 +13,7 @@ from .fsspec.s3 import s3_dest_cmd, s3_src_cmd
from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd
from .google_drive import google_drive_src_cmd
from .local import local_dest_cmd, local_src_cmd
from .mongodb import mongodb_dest_cmd
from .onedrive import onedrive_drive_src_cmd
from .opensearch import opensearch_dest_cmd, opensearch_src_cmd
from .pinecone import pinecone_dest_cmd
@ -55,6 +56,7 @@ dest_cmds = [
s3_dest_cmd,
sftp_dest_cmd,
weaviate_dest_cmd,
mongodb_dest_cmd,
]
duplicate_dest_names = [

View File

@ -0,0 +1,62 @@
from dataclasses import dataclass
import click
from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.mongodb import CONNECTOR_TYPE
@dataclass
class MongoDBCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--uri"],
help="URI to user when connecting",
),
click.Option(
["--host"],
help="hostname or IP address or Unix domain socket path of a single mongod or "
"mongos instance to connect to, or a list of hostnames",
),
click.Option(["--port"], type=int, default=27017),
click.Option(
["--database"], type=str, required=True, help="database name to connect to"
),
click.Option(
["--collection"], required=True, type=str, help="collection name to connect to"
),
]
return options
@dataclass
class MongoDBCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--batch-size"],
default=100,
type=int,
help="Number of records per batch",
)
]
return options
@dataclass
class MongoDBCliUploadStagerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
return []
mongodb_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=MongoDBCliConnectionConfig,
uploader_config=MongoDBCliUploaderConfig,
upload_stager_config=MongoDBCliUploadStagerConfig,
)

View File

@ -0,0 +1,52 @@
import random
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.mongodb import (
MongoDBAccessConfig,
MongoDBConnectionConfig,
MongoDBUploaderConfig,
MongoDBUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=MongoDBConnectionConfig(
access_config=MongoDBAccessConfig(uri=None),
host="localhost",
port=27017,
collection=f"test-collection-{random.randint(1000,9999)}",
database="testDatabase",
),
stager_config=MongoDBUploadStagerConfig(),
uploader_config=MongoDBUploaderConfig(batch_size=10),
).run()

View File

@ -0,0 +1,141 @@
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional
from unstructured.__version__ import __version__ as unstructured_version
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
FileData,
UploadContent,
Uploader,
UploaderConfig,
UploadStager,
UploadStagerConfig,
)
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
add_destination_entry,
)
from unstructured.utils import requires_dependencies
if TYPE_CHECKING:
from pymongo import MongoClient
CONNECTOR_TYPE = "mongodb"
SERVER_API_VERSION = "1"
@dataclass
class MongoDBAccessConfig(AccessConfig):
uri: Optional[str] = None
@dataclass
class MongoDBConnectionConfig(ConnectionConfig):
access_config: MongoDBAccessConfig = enhanced_field(
sensitive=True, default_factory=MongoDBAccessConfig
)
host: Optional[str] = None
database: Optional[str] = None
collection: Optional[str] = None
port: int = 27017
batch_size: int = 100
connector_type: str = CONNECTOR_TYPE
@dataclass
class MongoDBUploadStagerConfig(UploadStagerConfig):
pass
@dataclass
class MongoDBUploadStager(UploadStager):
upload_stager_config: MongoDBUploadStagerConfig = field(
default_factory=lambda: MongoDBUploadStagerConfig()
)
def run(
self,
elements_filepath: Path,
file_data: FileData,
output_dir: Path,
output_filename: str,
**kwargs: Any,
) -> Path:
with open(elements_filepath) as elements_file:
elements_contents = json.load(elements_file)
output_path = Path(output_dir) / Path(f"{output_filename}.json")
with open(output_path, "w") as output_file:
json.dump(elements_contents, output_file)
return output_path
@dataclass
class MongoDBUploaderConfig(UploaderConfig):
batch_size: int = 100
@dataclass
class MongoDBUploader(Uploader):
upload_config: MongoDBUploaderConfig
connection_config: MongoDBConnectionConfig
client: Optional["MongoClient"] = field(init=False)
connector_type: str = CONNECTOR_TYPE
def __post_init__(self):
self.client = self.create_client()
@requires_dependencies(["pymongo"], extras="mongodb")
def create_client(self) -> "MongoClient":
from pymongo import MongoClient
from pymongo.driver_info import DriverInfo
from pymongo.server_api import ServerApi
if self.connection_config.access_config.uri:
return MongoClient(
self.connection_config.access_config.uri,
server_api=ServerApi(version=SERVER_API_VERSION),
driver=DriverInfo(name="unstructured", version=unstructured_version),
)
else:
return MongoClient(
host=self.connection_config.host,
port=self.connection_config.port,
server_api=ServerApi(version=SERVER_API_VERSION),
)
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
elements_dict = []
for content in contents:
with open(content.path) as elements_file:
elements = json.load(elements_file)
elements_dict.extend(elements)
logger.info(
f"writing {len(elements_dict)} objects to destination "
f"db, {self.connection_config.database}, "
f"collection {self.connection_config.collection} "
f"at {self.connection_config.host}",
)
db = self.client[self.connection_config.database]
collection = db[self.connection_config.collection]
for chunk in chunk_generator(elements_dict, self.upload_config.batch_size):
collection.insert_many(chunk)
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
connection_config=MongoDBConnectionConfig,
uploader=MongoDBUploader,
uploader_config=MongoDBUploaderConfig,
upload_stager=MongoDBUploadStager,
upload_stager_config=MongoDBUploadStagerConfig,
),
)