From b8af2f18bb314173651c6f4d0211d5831f5fdfef Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Thu, 16 Nov 2023 14:40:22 -0800 Subject: [PATCH] add mongo db destination connector (#2068) ### Description This adds the basic implementation of pushing the generated json output of partition to mongodb. None of this code provisions the mondo db instance so things like adding a search index around the embedding content must be done by the user. Any sort of schema validation would also have to take place via user-specific configuration on the database. This update makes no assumptions about the configuration of the database itself. --- .github/workflows/ci.yml | 2 + CHANGELOG.md | 1 + .../ingest/destination_connectors/mongodb.rst | 68 ++++++++ requirements/base.txt | 2 +- requirements/dev.txt | 6 +- requirements/extra-paddleocr.txt | 2 +- requirements/extra-pdf-image.txt | 2 +- .../ingest/azure-cognitive-search.txt | 16 +- requirements/ingest/azure.txt | 2 +- requirements/ingest/confluence.txt | 2 +- requirements/ingest/delta-table.txt | 2 +- requirements/ingest/elasticsearch.txt | 2 +- requirements/ingest/embed-aws-bedrock.txt | 8 +- requirements/ingest/embed-huggingface.txt | 8 +- requirements/ingest/embed-openai.txt | 12 +- requirements/ingest/gcs.txt | 2 +- requirements/ingest/github.txt | 2 +- requirements/ingest/google-drive.txt | 2 +- requirements/ingest/jira.txt | 2 +- requirements/ingest/mongodb.in | 3 + requirements/ingest/mongodb.txt | 10 ++ requirements/ingest/notion.txt | 2 +- requirements/ingest/s3.txt | 2 +- requirements/test.txt | 8 +- setup.py | 1 + .../dest/azure-cognitive-search.sh | 4 - test_unstructured_ingest/dest/delta-table.sh | 4 - test_unstructured_ingest/dest/dropbox.sh | 3 - test_unstructured_ingest/dest/gcs.sh | 4 - test_unstructured_ingest/dest/mongodb.sh | 69 ++++++++ test_unstructured_ingest/dest/s3.sh | 3 - .../python/test-ingest-mongodb.py | 150 ++++++++++++++++++ test_unstructured_ingest/test-ingest-dest.sh | 1 + unstructured/ingest/cli/cmds/__init__.py | 2 + unstructured/ingest/cli/cmds/mongodb.py | 60 +++++++ unstructured/ingest/connector/mongodb.py | 111 +++++++++++++ .../ingest/runner/writers/__init__.py | 2 + unstructured/ingest/runner/writers/mongodb.py | 29 ++++ 38 files changed, 542 insertions(+), 69 deletions(-) create mode 100644 docs/source/ingest/destination_connectors/mongodb.rst create mode 100644 requirements/ingest/mongodb.in create mode 100644 requirements/ingest/mongodb.txt create mode 100755 test_unstructured_ingest/dest/mongodb.sh create mode 100755 test_unstructured_ingest/python/test-ingest-mongodb.py create mode 100644 unstructured/ingest/cli/cmds/mongodb.py create mode 100644 unstructured/ingest/connector/mongodb.py create mode 100644 unstructured/ingest/runner/writers/mongodb.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3882fc494..28b1ac8db 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -446,6 +446,8 @@ jobs: DROPBOX_REFRESH_TOKEN: ${{ secrets.DROPBOX_REFRESH_TOKEN }} GCP_INGEST_SERVICE_KEY: ${{ secrets.GCP_INGEST_SERVICE_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + MONGODB_URI: ${{ secrets.MONGODB_URI }} + MONGODB_DATABASE_NAME: ${{ secrets.MONGODB_DATABASE_NAME }} TABLE_OCR: "tesseract" OCR_AGENT: "tesseract" CI: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index ca7541077..9b2883f56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Features * **Add ad-hoc fields to ElementMetadata instance.** End-users can now add their own metadata fields simply by assigning to an element-metadata attribute-name of their choice, like `element.metadata.coefficient = 0.58`. These fields will round-trip through JSON and can be accessed with dotted notation. +* **MongoDB Destination Connector** New destination connector added to all CLI ingest commands to support writing partitioned json output to mongodb. ### Fixes diff --git a/docs/source/ingest/destination_connectors/mongodb.rst b/docs/source/ingest/destination_connectors/mongodb.rst new file mode 100644 index 000000000..a5552c5bf --- /dev/null +++ b/docs/source/ingest/destination_connectors/mongodb.rst @@ -0,0 +1,68 @@ +MongoDB +====================== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to an MongoDB collection. + +First you'll need to install the MongoDB dependencies as shown here. + +.. code:: shell + + pip install "unstructured[mongodb]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. + +.. tabs:: + + .. tab:: Shell + + .. code:: shell + + unstructured-ingest \ + local \ + --input-path example-docs/fake-memo.pdf \ + --anonymous \ + --output-dir local-output-to-mongo \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + mongodb \ + --uri "$MONGODB_URI" \ + --database "$MONGODB_DATABASE_NAME" \ + --collection "$DESTINATION_MONGO_COLLECTION" + + .. tab:: Python + + .. code:: python + + import os + + from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig + from unstructured.ingest.runner import LocalRunner + + if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-mongo", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + writer_type="mongodb", + writer_kwargs={ + "uri": os.getenv("MONGODB_URI"), + "database": os.getenv("MONGODB_DATABASE_NAME"), + "collection": os.getenv("DESTINATION_MONGO_COLLECTION") + } + ) + runner.run( + input_path="example-docs/fake-memo.pdf", + ) + + +For a full list of the options the CLI accepts check ``unstructured-ingest mongodb --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/requirements/base.txt b/requirements/base.txt index d4109e61d..0d1c07379 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -18,7 +18,7 @@ charset-normalizer==3.3.2 # via requests click==8.1.7 # via nltk -dataclasses-json==0.6.1 +dataclasses-json==0.6.2 # via -r base.in emoji==2.8.0 # via -r base.in diff --git a/requirements/dev.txt b/requirements/dev.txt index 8efebfc2c..a365ddf30 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -180,7 +180,7 @@ jupyterlab==4.0.8 # via notebook jupyterlab-pygments==0.2.2 # via nbconvert -jupyterlab-server==2.25.0 +jupyterlab-server==2.25.1 # via # jupyterlab # notebook @@ -253,7 +253,7 @@ pre-commit==3.5.0 # via -r dev.in prometheus-client==0.18.0 # via jupyter-server -prompt-toolkit==3.0.39 +prompt-toolkit==3.0.40 # via # ipython # jupyter-console @@ -340,7 +340,7 @@ soupsieve==2.5 # beautifulsoup4 stack-data==0.6.3 # via ipython -terminado==0.17.1 +terminado==0.18.0 # via # jupyter-server # jupyter-server-terminals diff --git a/requirements/extra-paddleocr.txt b/requirements/extra-paddleocr.txt index 9729cf319..cc03af060 100644 --- a/requirements/extra-paddleocr.txt +++ b/requirements/extra-paddleocr.txt @@ -8,7 +8,7 @@ attrdict==2.0.1 # via unstructured-paddleocr babel==2.13.1 # via flask-babel -bce-python-sdk==0.8.92 +bce-python-sdk==0.8.96 # via visualdl blinker==1.7.0 # via flask diff --git a/requirements/extra-pdf-image.txt b/requirements/extra-pdf-image.txt index 8ce9b92be..38bdb49b7 100644 --- a/requirements/extra-pdf-image.txt +++ b/requirements/extra-pdf-image.txt @@ -154,7 +154,7 @@ pyparsing==3.0.9 # via # -c constraints.in # matplotlib -pypdfium2==4.23.1 +pypdfium2==4.24.0 # via pdfplumber pytesseract==0.3.10 # via layoutparser diff --git a/requirements/ingest/azure-cognitive-search.txt b/requirements/ingest/azure-cognitive-search.txt index a9ad0a1f9..df25b29f6 100644 --- a/requirements/ingest/azure-cognitive-search.txt +++ b/requirements/ingest/azure-cognitive-search.txt @@ -7,16 +7,13 @@ azure-common==1.1.28 # via azure-search-documents azure-core==1.29.5 - # via - # azure-search-documents - # msrest -azure-search-documents==11.3.0 + # via azure-search-documents +azure-search-documents==11.4.0 # via -r ingest/azure-cognitive-search.in certifi==2023.7.22 # via # -c ingest/../base.txt # -c ingest/../constraints.in - # msrest # requests charset-normalizer==3.3.2 # via @@ -27,19 +24,11 @@ idna==3.4 # -c ingest/../base.txt # requests isodate==0.6.1 - # via msrest -msrest==0.7.1 # via azure-search-documents -oauthlib==3.2.2 - # via requests-oauthlib requests==2.31.0 # via # -c ingest/../base.txt # azure-core - # msrest - # requests-oauthlib -requests-oauthlib==1.3.1 - # via msrest six==1.16.0 # via # -c ingest/../base.txt @@ -49,7 +38,6 @@ typing-extensions==4.8.0 # via # -c ingest/../base.txt # azure-core - # azure-search-documents urllib3==1.26.18 # via # -c ingest/../base.txt diff --git a/requirements/ingest/azure.txt b/requirements/ingest/azure.txt index d1b63ee55..bdb526fb7 100644 --- a/requirements/ingest/azure.txt +++ b/requirements/ingest/azure.txt @@ -23,7 +23,7 @@ azure-datalake-store==0.0.53 # via adlfs azure-identity==1.15.0 # via adlfs -azure-storage-blob==12.18.3 +azure-storage-blob==12.19.0 # via adlfs certifi==2023.7.22 # via diff --git a/requirements/ingest/confluence.txt b/requirements/ingest/confluence.txt index f09703c97..8e316456e 100644 --- a/requirements/ingest/confluence.txt +++ b/requirements/ingest/confluence.txt @@ -41,5 +41,5 @@ urllib3==1.26.18 # -c ingest/../base.txt # -c ingest/../constraints.in # requests -wrapt==1.15.0 +wrapt==1.16.0 # via deprecated diff --git a/requirements/ingest/delta-table.txt b/requirements/ingest/delta-table.txt index 6838715d6..1203ed11d 100644 --- a/requirements/ingest/delta-table.txt +++ b/requirements/ingest/delta-table.txt @@ -15,5 +15,5 @@ numpy==1.24.4 # -c ingest/../base.txt # -c ingest/../constraints.in # pyarrow -pyarrow==14.0.0 +pyarrow==14.0.1 # via deltalake diff --git a/requirements/ingest/elasticsearch.txt b/requirements/ingest/elasticsearch.txt index e2fb8052a..3817fe427 100644 --- a/requirements/ingest/elasticsearch.txt +++ b/requirements/ingest/elasticsearch.txt @@ -11,7 +11,7 @@ certifi==2023.7.22 # elastic-transport elastic-transport==8.10.0 # via elasticsearch -elasticsearch==8.10.1 +elasticsearch==8.11.0 # via -r ingest/elasticsearch.in jq==1.6.0 # via -r ingest/elasticsearch.in diff --git a/requirements/ingest/embed-aws-bedrock.txt b/requirements/ingest/embed-aws-bedrock.txt index 218e7f90b..7a555ac5d 100644 --- a/requirements/ingest/embed-aws-bedrock.txt +++ b/requirements/ingest/embed-aws-bedrock.txt @@ -37,7 +37,7 @@ charset-normalizer==3.3.2 # -c ingest/../base.txt # aiohttp # requests -dataclasses-json==0.6.1 +dataclasses-json==0.6.2 # via # -c ingest/../base.txt # langchain @@ -47,8 +47,6 @@ frozenlist==1.4.0 # via # aiohttp # aiosignal -greenlet==3.0.1 - # via sqlalchemy idna==3.4 # via # -c ingest/../base.txt @@ -63,9 +61,9 @@ jsonpatch==1.33 # via langchain jsonpointer==2.4 # via jsonpatch -langchain==0.0.331 +langchain==0.0.335 # via -r ingest/embed-aws-bedrock.in -langsmith==0.0.60 +langsmith==0.0.63 # via langchain marshmallow==3.20.1 # via diff --git a/requirements/ingest/embed-huggingface.txt b/requirements/ingest/embed-huggingface.txt index 4ba742ead..ba46d9c8b 100644 --- a/requirements/ingest/embed-huggingface.txt +++ b/requirements/ingest/embed-huggingface.txt @@ -32,7 +32,7 @@ click==8.1.7 # via # -c ingest/../base.txt # nltk -dataclasses-json==0.6.1 +dataclasses-json==0.6.2 # via # -c ingest/../base.txt # langchain @@ -52,8 +52,6 @@ fsspec==2023.9.1 # -c ingest/../constraints.in # huggingface-hub # torch -greenlet==3.0.1 - # via sqlalchemy huggingface==0.0.1 # via -r ingest/embed-huggingface.in huggingface-hub==0.17.3 @@ -78,9 +76,9 @@ jsonpatch==1.33 # via langchain jsonpointer==2.4 # via jsonpatch -langchain==0.0.331 +langchain==0.0.335 # via -r ingest/embed-huggingface.in -langsmith==0.0.60 +langsmith==0.0.63 # via langchain markupsafe==2.1.3 # via jinja2 diff --git a/requirements/ingest/embed-openai.txt b/requirements/ingest/embed-openai.txt index 63985cccd..e5761692d 100644 --- a/requirements/ingest/embed-openai.txt +++ b/requirements/ingest/embed-openai.txt @@ -32,7 +32,7 @@ charset-normalizer==3.3.2 # -c ingest/../base.txt # aiohttp # requests -dataclasses-json==0.6.1 +dataclasses-json==0.6.2 # via # -c ingest/../base.txt # langchain @@ -44,11 +44,9 @@ frozenlist==1.4.0 # via # aiohttp # aiosignal -greenlet==3.0.1 - # via sqlalchemy h11==0.14.0 # via httpcore -httpcore==1.0.1 +httpcore==1.0.2 # via httpx httpx==0.25.1 # via openai @@ -63,9 +61,9 @@ jsonpatch==1.33 # via langchain jsonpointer==2.4 # via jsonpatch -langchain==0.0.331 +langchain==0.0.335 # via -r ingest/embed-openai.in -langsmith==0.0.60 +langsmith==0.0.63 # via langchain marshmallow==3.20.1 # via @@ -84,7 +82,7 @@ numpy==1.24.4 # -c ingest/../base.txt # -c ingest/../constraints.in # langchain -openai==1.1.1 +openai==1.2.3 # via -r ingest/embed-openai.in packaging==23.2 # via diff --git a/requirements/ingest/gcs.txt b/requirements/ingest/gcs.txt index 13df85bf8..bf0bf89f6 100644 --- a/requirements/ingest/gcs.txt +++ b/requirements/ingest/gcs.txt @@ -43,7 +43,7 @@ fsspec==2023.9.1 # gcsfs gcsfs==2023.9.1 # via -r ingest/gcs.in -google-api-core==2.12.0 +google-api-core==2.14.0 # via # google-cloud-core # google-cloud-storage diff --git a/requirements/ingest/github.txt b/requirements/ingest/github.txt index d049dad62..943332830 100644 --- a/requirements/ingest/github.txt +++ b/requirements/ingest/github.txt @@ -53,5 +53,5 @@ urllib3==1.26.18 # -c ingest/../constraints.in # pygithub # requests -wrapt==1.15.0 +wrapt==1.16.0 # via deprecated diff --git a/requirements/ingest/google-drive.txt b/requirements/ingest/google-drive.txt index 4875f6a21..ec3447086 100644 --- a/requirements/ingest/google-drive.txt +++ b/requirements/ingest/google-drive.txt @@ -15,7 +15,7 @@ charset-normalizer==3.3.2 # via # -c ingest/../base.txt # requests -google-api-core==2.12.0 +google-api-core==2.14.0 # via google-api-python-client google-api-python-client==2.107.0 # via -r ingest/google-drive.in diff --git a/requirements/ingest/jira.txt b/requirements/ingest/jira.txt index cd010707a..80825a87f 100644 --- a/requirements/ingest/jira.txt +++ b/requirements/ingest/jira.txt @@ -41,5 +41,5 @@ urllib3==1.26.18 # -c ingest/../base.txt # -c ingest/../constraints.in # requests -wrapt==1.15.0 +wrapt==1.16.0 # via deprecated diff --git a/requirements/ingest/mongodb.in b/requirements/ingest/mongodb.in new file mode 100644 index 000000000..e2af65a14 --- /dev/null +++ b/requirements/ingest/mongodb.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +pymongo diff --git a/requirements/ingest/mongodb.txt b/requirements/ingest/mongodb.txt new file mode 100644 index 000000000..e193ef4e0 --- /dev/null +++ b/requirements/ingest/mongodb.txt @@ -0,0 +1,10 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --output-file=ingest/mongodb.txt ingest/mongodb.in +# +dnspython==2.4.2 + # via pymongo +pymongo==4.6.0 + # via -r ingest/mongodb.in diff --git a/requirements/ingest/notion.txt b/requirements/ingest/notion.txt index 4bef09675..85dd17036 100644 --- a/requirements/ingest/notion.txt +++ b/requirements/ingest/notion.txt @@ -20,7 +20,7 @@ h11==0.14.0 # via httpcore htmlbuilder==1.0.0 # via -r ingest/notion.in -httpcore==1.0.1 +httpcore==1.0.2 # via httpx httpx==0.25.1 # via notion-client diff --git a/requirements/ingest/s3.txt b/requirements/ingest/s3.txt index 133027b7f..01dc011ca 100644 --- a/requirements/ingest/s3.txt +++ b/requirements/ingest/s3.txt @@ -62,7 +62,7 @@ urllib3==1.26.18 # -c ingest/../base.txt # -c ingest/../constraints.in # botocore -wrapt==1.15.0 +wrapt==1.16.0 # via aiobotocore yarl==1.9.2 # via aiohttp diff --git a/requirements/test.txt b/requirements/test.txt index 66a7b1550..c71473d89 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -8,7 +8,7 @@ appdirs==1.4.4 # via label-studio-tools autoflake==2.2.1 # via -r test.in -black==23.10.1 +black==23.11.0 # via -r test.in certifi==2023.7.22 # via @@ -56,7 +56,7 @@ mccabe==0.7.0 # via flake8 multidict==6.0.4 # via yarl -mypy==1.6.1 +mypy==1.7.0 # via -r test.in mypy-extensions==1.0.0 # via @@ -103,7 +103,7 @@ requests==2.31.0 # via # -c base.txt # label-studio-sdk -ruff==0.1.4 +ruff==0.1.5 # via -r test.in six==1.16.0 # via @@ -140,7 +140,7 @@ urllib3==1.26.18 # vcrpy vcrpy==5.1.0 # via -r test.in -wrapt==1.15.0 +wrapt==1.16.0 # via vcrpy yarl==1.9.2 # via vcrpy diff --git a/setup.py b/setup.py index 82eb196d9..c742aecd7 100644 --- a/setup.py +++ b/setup.py @@ -160,6 +160,7 @@ setup( "embed-huggingface": load_requirements("requirements/ingest/embed-huggingface.in"), "openai": load_requirements("requirements/ingest/embed-openai.in"), "bedrock": load_requirements("requirements/ingest/embed-aws-bedrock.in"), + "mongodb": load_requirements("requirements/ingest/mongodb.in"), }, package_dir={"unstructured": "unstructured"}, package_data={"unstructured": ["nlp/*.txt"]}, diff --git a/test_unstructured_ingest/dest/azure-cognitive-search.sh b/test_unstructured_ingest/dest/azure-cognitive-search.sh index 1c5d69416..5a22e1a78 100755 --- a/test_unstructured_ingest/dest/azure-cognitive-search.sh +++ b/test_unstructured_ingest/dest/azure-cognitive-search.sh @@ -11,7 +11,6 @@ WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME OUTPUT_FOLDER_NAME=azure-cog-search-dest max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} -DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME DESTINATION_INDEX="utic-test-ingest-fixtures-output-$(uuidgen)" # The vector configs on the schema currently only exist on versions: # 2023-07-01-Preview, 2021-04-30-Preview, 2020-06-30-Preview @@ -42,9 +41,6 @@ function cleanup { # Local file cleanup cleanup_dir "$WORK_DIR" cleanup_dir "$OUTPUT_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi } trap cleanup EXIT diff --git a/test_unstructured_ingest/dest/delta-table.sh b/test_unstructured_ingest/dest/delta-table.sh index 2f9a00fc1..0b222b2a9 100755 --- a/test_unstructured_ingest/dest/delta-table.sh +++ b/test_unstructured_ingest/dest/delta-table.sh @@ -8,7 +8,6 @@ cd "$SCRIPT_DIR"/.. || exit 1 OUTPUT_FOLDER_NAME=delta-table-dest OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME -DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME DESTINATION_TABLE=$SCRIPT_DIR/delta-table-dest max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} CI=${CI:-"false"} @@ -20,9 +19,6 @@ function cleanup() { cleanup_dir "$DESTINATION_TABLE" cleanup_dir "$OUTPUT_DIR" cleanup_dir "$WORK_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi } trap cleanup EXIT diff --git a/test_unstructured_ingest/dest/dropbox.sh b/test_unstructured_ingest/dest/dropbox.sh index da856bee4..3db7ca30b 100755 --- a/test_unstructured_ingest/dest/dropbox.sh +++ b/test_unstructured_ingest/dest/dropbox.sh @@ -28,9 +28,6 @@ source "$SCRIPT_DIR"/cleanup.sh function cleanup() { cleanup_dir "$OUTPUT_DIR" cleanup_dir "$WORK_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi echo "deleting test folder $DESTINATION_DROPBOX" curl -X POST https://api.dropboxapi.com/2/files/delete_v2 \ diff --git a/test_unstructured_ingest/dest/gcs.sh b/test_unstructured_ingest/dest/gcs.sh index d298ee08d..f399257c8 100755 --- a/test_unstructured_ingest/dest/gcs.sh +++ b/test_unstructured_ingest/dest/gcs.sh @@ -12,7 +12,6 @@ WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} BUCKET="utic-test-ingest-fixtures-output" DIRECTORY=$(uuidgen) -DIRECTORY="test" DESTINATION_GCS="gs://$BUCKET/$DIRECTORY" CI=${CI:-"false"} @@ -30,9 +29,6 @@ source "$SCRIPT_DIR"/cleanup.sh function cleanup() { cleanup_dir "$OUTPUT_DIR" cleanup_dir "$WORK_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi python "$SCRIPT_DIR"/python/test-gcs-output.py down \ --service-account-file "$GCP_INGEST_SERVICE_KEY_FILE" \ diff --git a/test_unstructured_ingest/dest/mongodb.sh b/test_unstructured_ingest/dest/mongodb.sh new file mode 100755 index 000000000..53496c347 --- /dev/null +++ b/test_unstructured_ingest/dest/mongodb.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +set -e + +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=mongodb-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} +DESTINATION_MONGO_COLLECTION="utic-test-ingest-fixtures-output-$(uuidgen)" +CI=${CI:-"false"} + +if [ -z "$MONGODB_URI" ] && [ -z "$MONGODB_DATABASE_NAME" ]; then + echo "Skipping MongoDB destination ingest test because the MONGODB_URI and MONGODB_DATABASE_NAME env var are not set." + exit 0 +fi + + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup() { + cleanup_dir "$OUTPUT_DIR" + cleanup_dir "$WORK_DIR" + + python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ + --uri "$MONGODB_URI" \ + --database "$MONGODB_DATABASE_NAME" \ + --collection "$DESTINATION_MONGO_COLLECTION" down + +} + +trap cleanup EXIT + +python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ +--uri "$MONGODB_URI" \ +--database "$MONGODB_DATABASE_NAME" \ +--collection "$DESTINATION_MONGO_COLLECTION" up + +RUN_SCRIPT=${RUN_SCRIPT:-./unstructured/ingest/main.py} +PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + --embedding-provider "langchain-huggingface" \ + mongodb \ + --uri "$MONGODB_URI" \ + --database "$MONGODB_DATABASE_NAME" \ + --collection "$DESTINATION_MONGO_COLLECTION" + +python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ +--uri "$MONGODB_URI" \ +--database "$MONGODB_DATABASE_NAME" \ +--collection "$DESTINATION_MONGO_COLLECTION" \ +check --expected-records 5 + +python "$SCRIPT_DIR"/python/test-ingest-mongodb.py \ +--uri "$MONGODB_URI" \ +--database "$MONGODB_DATABASE_NAME" \ +--collection "$DESTINATION_MONGO_COLLECTION" \ +check-vector \ +--output-json "$OUTPUT_ROOT"/structured-output/$OUTPUT_FOLDER_NAME/example-docs/fake-memo.pdf.json diff --git a/test_unstructured_ingest/dest/s3.sh b/test_unstructured_ingest/dest/s3.sh index 1185b47b1..fc0bbf14c 100755 --- a/test_unstructured_ingest/dest/s3.sh +++ b/test_unstructured_ingest/dest/s3.sh @@ -18,9 +18,6 @@ source "$SCRIPT_DIR"/cleanup.sh function cleanup() { cleanup_dir "$OUTPUT_DIR" cleanup_dir "$WORK_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi if aws s3 ls "$DESTINATION_S3" --region us-east-2; then echo "deleting destination s3 location: $DESTINATION_S3" diff --git a/test_unstructured_ingest/python/test-ingest-mongodb.py b/test_unstructured_ingest/python/test-ingest-mongodb.py new file mode 100755 index 000000000..4948defc8 --- /dev/null +++ b/test_unstructured_ingest/python/test-ingest-mongodb.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +import json +import time + +import click +from pymongo.mongo_client import MongoClient +from pymongo.operations import SearchIndexModel + + +def get_client(uri: str) -> MongoClient: + client = MongoClient(uri) + client.admin.command("ping") + print("Successfully connected to MongoDB") + return client + + +@click.group(name="mongo-ingest") +@click.option("--uri", type=str, required=True) +@click.option("--database", type=str, required=True) +@click.option("--collection", type=str, required=True) +@click.pass_context +def cli(ctx, uri: str, database: str, collection: str): + # ensure that ctx.obj exists and is a dict (in case `cli()` is called + # by means other than the `if` block below) + ctx.ensure_object(dict) + + ctx.obj["client"] = get_client(uri) + + +@cli.command() +@click.pass_context +def up(ctx): + client = ctx.obj["client"] + collection_name = ctx.parent.params["collection"] + db = client[ctx.parent.params["database"]] + print(f"creating collection {collection_name}") + collection = db.create_collection(name=collection_name) + print(f"successfully created collection: {collection_name}") + if "embeddings" in [c["name"] for c in collection.list_search_indexes()]: + print("search index already exists, skipping creation") + return + + search_index_name = collection.create_search_index( + model=SearchIndexModel( + name="embeddings", + definition={ + "mappings": { + "dynamic": True, + "fields": { + "embeddings": [ + {"type": "knnVector", "dimensions": 384, "similarity": "euclidean"} + ] + }, + } + }, + ) + ) + print(f"Added search index: {search_index_name}") + + +@cli.command() +@click.pass_context +def down(ctx): + collection_name = ctx.parent.params["collection"] + client = ctx.obj["client"] + db = client[ctx.parent.params["database"]] + if collection_name not in db.list_collection_names(): + print( + "collection name {} does not exist amongst those in database: {}, " + "skipping deletion".format(collection_name, ", ".join(db.list_collection_names())) + ) + return + print(f"deleting collection: {collection_name}") + collection = db[collection_name] + collection.drop() + print(f"successfully deleted collection: {collection}") + + +@cli.command() +@click.option("--expected-records", type=int, required=True) +@click.pass_context +def check(ctx, expected_records: int): + client = ctx.obj["client"] + db = client[ctx.parent.params["database"]] + collection = db[ctx.parent.params["collection"]] + count = collection.count_documents(filter={}) + print(f"checking the count in the db ({count}) matches what's expected: {expected_records}") + assert ( + count == expected_records + ), f"expected count ({expected_records}) does not match how many records were found: {count}" + print("successfully checked that the expected number of records was found in the db!") + + +@cli.command() +@click.option("--output-json", type=click.File()) +@click.pass_context +def check_vector(ctx, output_json): + """ + Checks the functionality of the vector search index by getting a score based on the + exact result of one of the embeddings. Makes sure that the search index itself has finished + indexing before running a query, then validated that the first item in the returned sorted + list has a score of 1.0 given that the exact embedding is used as a match, and all others + have a score less than 1.0. + """ + # Get the first embedding from the output file + json_content = json.load(output_json) + exact_embedding = json_content[0]["embeddings"] + client = ctx.obj["client"] + db = client[ctx.parent.params["database"]] + collection = db[ctx.parent.params["collection"]] + vector_index_name = "embeddings" + status = [ind for ind in collection.list_search_indexes() if ind["name"] == vector_index_name][ + 0 + ].get("status") + max_attempts = 30 + attempts = 0 + wait_seconds = 5 + while status != "READY" and attempts < max_attempts: + print( + f"status of search index: {status}, waiting another {wait_seconds} " + f"seconds for it to be ready" + ) + attempts += 1 + time.sleep(wait_seconds) + status = [ + ind for ind in collection.list_search_indexes() if ind["name"] == vector_index_name + ][0].get("status") + print(f"search index is ready to go ({status}), checking vector content") + pipeline = [ + { + "$vectorSearch": { + "index": "embeddings", + "path": "embeddings", + "queryVector": exact_embedding, + "numCandidates": 150, + "limit": 10, + }, + }, + {"$project": {"_id": 0, "text": 1, "score": {"$meta": "vectorSearchScore"}}}, + ] + result = list(collection.aggregate(pipeline=pipeline)) + print(f"vector query result: {result}") + assert result[0]["score"] == 1.0, "score detected should be 1: {}".format(result[0]["score"]) + for r in result[1:]: + assert r["score"] < 1.0, "score detected should be less than 1: {}".format(r["score"]) + print("successfully validated vector content!") + + +if __name__ == "__main__": + cli() diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index 421702e2b..f64e1f89d 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -15,6 +15,7 @@ all_tests=( 'delta-table.sh' 'dropbox.sh' 'gcs.sh' + 'mongodb.sh' 's3.sh' ) diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 7a75a5666..9b7507943 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -28,6 +28,7 @@ from .gitlab import get_base_src_cmd as gitlab_base_src_cmd from .google_drive import get_base_src_cmd as google_drive_base_src_cmd from .jira import get_base_src_cmd as jira_base_src_cmd from .local import get_base_src_cmd as local_base_src_cmd +from .mongodb import get_base_dest_cmd as mongo_base_dest_cmd from .notion import get_base_src_cmd as notion_base_src_cmd from .onedrive import get_base_src_cmd as onedrive_base_src_cmd from .outlook import get_base_src_cmd as outlook_base_src_cmd @@ -89,6 +90,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [ s3_base_dest_cmd, azure_cognitive_search_base_dest_cmd, delta_table_dest_cmd, + mongo_base_dest_cmd, ] # Make sure there are not overlapping names diff --git a/unstructured/ingest/cli/cmds/mongodb.py b/unstructured/ingest/cli/cmds/mongodb.py new file mode 100644 index 000000000..35d84a27c --- /dev/null +++ b/unstructured/ingest/cli/cmds/mongodb.py @@ -0,0 +1,60 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import CliConfig, DelimitedString, Dict +from unstructured.ingest.connector.mongodb import MongoDBWriteConfig, SimpleMongoDBStorageConfig + +CMD_NAME = "mongodb" + + +@dataclass +class MongoDBCliConfig(SimpleMongoDBStorageConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--uri"], + help="URI to user when connecting", + ), + click.Option( + ["--host"], + type=DelimitedString(), + help="hostname or IP address or Unix domain socket path of a single mongod or " + "mongos instance to connect to, or a list of hostnames", + ), + click.Option(["--port"], type=int, default=27017), + click.Option( + ["--client-params"], + type=Dict(), + help="additional parameters to use when creating mongo client", + ), + ] + return options + + +@dataclass +class MongoDBCliWriteConfig(MongoDBWriteConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--database"], type=str, required=True, help="database name to connect to" + ), + click.Option( + ["--collection"], required=True, type=str, help="collection name to connect to" + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name=CMD_NAME, + cli_config=MongoDBCliConfig, + additional_cli_options=[MongoDBCliWriteConfig], + ) + return cmd_cls diff --git a/unstructured/ingest/connector/mongodb.py b/unstructured/ingest/connector/mongodb.py new file mode 100644 index 000000000..39892e4e8 --- /dev/null +++ b/unstructured/ingest/connector/mongodb.py @@ -0,0 +1,111 @@ +import json +import typing as t +from dataclasses import dataclass, field + +from unstructured.ingest.error import DestinationConnectionError, WriteError +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +if t.TYPE_CHECKING: + from pymongo import MongoClient + + +SERVER_API_VERSION = "1" + + +@dataclass +class SimpleMongoDBStorageConfig(BaseConnectorConfig): + uri: t.Optional[str] = None + host: t.Optional[str] = None + port: int = 27017 + client_params: t.Dict[str, t.Any] = field(default_factory=dict) + + +@dataclass +class MongoDBWriteConfig(WriteConfig): + database: str + collection: str + + +@dataclass +class MongoDBDestinationConnector(BaseDestinationConnector): + write_config: MongoDBWriteConfig + connector_config: SimpleMongoDBStorageConfig + _client: t.Optional["MongoClient"] = field(init=False, default=None) + + @requires_dependencies(["pymongo"], extras="mongodb") + def generate_client(self) -> "MongoClient": + from pymongo import MongoClient + from pymongo.server_api import ServerApi + + if self.connector_config.uri: + return MongoClient( + self.connector_config.uri, + server_api=ServerApi(version=SERVER_API_VERSION), + **self.connector_config.client_params, + ) + else: + return MongoClient( + host=self.connector_config.host, + port=self.connector_config.port, + server_api=ServerApi(version=SERVER_API_VERSION), + **self.connector_config.client_params, + ) + + @property + def client(self) -> "MongoClient": + if self._client is None: + self._client = self.generate_client() + return self._client + + @requires_dependencies(["pymongo"], extras="mongodb") + def check_connection(self): + try: + self.client.admin.command("ping") + except Exception as e: + logger.error(f"failed to validate connection: {e}", exc_info=True) + raise DestinationConnectionError(f"failed to validate connection: {e}") + + def initialize(self): + _ = self.client + + def conform_dict(self, data: dict) -> None: + pass + + def get_collection(self): + database = self.client[self.write_config.database] + return database.get_collection(name=self.write_config.collection) + + @requires_dependencies(["pymongo"], extras="mongodb") + def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"writing {len(elements_dict)} documents to destination " + f"database {self.write_config.database}, at collection {self.write_config.collection}", + ) + + collection = self.get_collection() + try: + collection.insert_many(elements_dict) + except Exception as e: + logger.error(f"failed to write records: {e}", exc_info=True) + raise WriteError(f"failed to write records: {e}") + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + json_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + json_content = json.load(json_file) + for content in json_content: + self.conform_dict(data=content) + logger.info( + f"appending {len(json_content)} json elements from content in {local_path}", + ) + json_list.extend(json_content) + self.write_dict(elements_dict=json_list) diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index 701d77dbe..e2a8d9de1 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -6,6 +6,7 @@ from .box import box_writer from .delta_table import delta_table_writer from .dropbox import dropbox_writer from .gcs import gcs_writer +from .mongodb import mongodb_writer from .s3 import s3_writer writer_map: t.Dict[str, t.Callable] = { @@ -15,6 +16,7 @@ writer_map: t.Dict[str, t.Callable] = { "delta_table": delta_table_writer, "dropbox": dropbox_writer, "gcs": gcs_writer, + "mongodb": mongodb_writer, "s3": s3_writer, } diff --git a/unstructured/ingest/runner/writers/mongodb.py b/unstructured/ingest/runner/writers/mongodb.py new file mode 100644 index 000000000..2312c4d4c --- /dev/null +++ b/unstructured/ingest/runner/writers/mongodb.py @@ -0,0 +1,29 @@ +import typing as t + +from unstructured.ingest.interfaces import BaseDestinationConnector + + +def mongodb_writer( + database: str, + collection: str, + upsert: bool = False, + uri: t.Optional[str] = None, + host: t.Optional[str] = None, + port: int = 27017, + client_params: t.Optional[t.Dict[str, t.Any]] = None, + verbose: bool = False, + **kwargs, +) -> BaseDestinationConnector: + client_params = client_params if client_params else {} + from unstructured.ingest.connector.mongodb import ( + MongoDBDestinationConnector, + MongoDBWriteConfig, + SimpleMongoDBStorageConfig, + ) + + return MongoDBDestinationConnector( + write_config=MongoDBWriteConfig(database=database, collection=collection), + connector_config=SimpleMongoDBStorageConfig( + uri=uri, host=host, port=port, client_params=client_params + ), + )