diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5835d781d..f354707b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -287,6 +287,7 @@ jobs: AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }} AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}} TABLE_OCR: "tesseract" OCR_AGENT: "tesseract" CI: "true" @@ -348,6 +349,7 @@ jobs: MONGODB_URI: ${{ secrets.MONGODB_URI }} MONGODB_DATABASE_NAME: ${{ secrets.MONGODB_DATABASE_NAME }} AZURE_DEST_CONNECTION_STR: ${{ secrets.AZURE_DEST_CONNECTION_STR }} + PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}} TABLE_OCR: "tesseract" OCR_AGENT: "tesseract" CI: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index 89552af62..a8fe9f233 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +## 0.11.2-dev0 + +### Enhancements + +### Features + +* * **Add Pinecone destination connector.** Problem: After ingesting data from a source, users might want to produce embeddings for their data and write these into a vector DB. Pinecone is an option among these vector databases. Feature: Added Pinecone destination connector to be able to ingest documents from any supported source, embed them and write the embeddings / documents into Pinecone. + +### Fixes + +* **Process chunking parameter names in ingest correctly** Solves a bug where chunking parameters weren't being processed and used by ingest cli by renaming faulty parameter names and prepends; adds relevant parameters to ingest pinecone test to verify that the parameters are functional. + ## 0.11.1 ### Enhancements diff --git a/Makefile b/Makefile index d9135f36b..16a07faf0 100644 --- a/Makefile +++ b/Makefile @@ -211,6 +211,10 @@ install-ingest-jira: install-ingest-hubspot: python3 -m pip install -r requirements/ingest-hubspot.txt +.PHONY: install-ingest-pinecone +install-ingest-pinecone: + python3 -m pip install -r requirements/ingest-pinecone.txt + .PHONY: install-embed-huggingface install-embed-huggingface: python3 -m pip install -r requirements/ingest/embed-huggingface.txt diff --git a/docs/source/core/chunking.rst b/docs/source/core/chunking.rst index d6b31a0f1..cd393b589 100644 --- a/docs/source/core/chunking.rst +++ b/docs/source/core/chunking.rst @@ -26,11 +26,11 @@ that span between pages. This kwarg is ``True`` by default. not split elements, it is possible for a section to exceed that lenght, for example if a ``NarrativeText`` elements exceeds ``1500`` characters on its on. -Similarly, sections under ``combine_under_n_chars`` will be combined if they +Similarly, sections under ``combine_text_under_n_chars`` will be combined if they do not exceed the specified threshold, which defaults to ``500``. This will combine a series of ``Title`` elements that occur one after another, which sometimes happens in lists that are not detected as ``ListItem`` elements. Set -``combine_under_n_chars=0`` to turn off this behavior. +``combine_text_under_n_chars=0`` to turn off this behavior. The following shows an example of how to use ``chunk_by_title``. You will see the document chunked into sections instead of elements. diff --git a/docs/source/ingest/configs/chunking_config.rst b/docs/source/ingest/configs/chunking_config.rst index bba180bf3..efc6b57a9 100644 --- a/docs/source/ingest/configs/chunking_config.rst +++ b/docs/source/ingest/configs/chunking_config.rst @@ -16,4 +16,5 @@ Configs * ``chunk_elements (default False)``: Boolean flag whether to run chunking as part of the ingest process. * ``multipage_sections (default True)``: If True, sections can span multiple pages. * ``combine_text_under_n_chars (default 500)``: Combines elements (for example a series of titles) until a section reaches a length of n characters. Defaults to `max_characters` which combines chunks whenever space allows. Specifying 0 for this argument suppresses combining of small chunks. Note this value is "capped" at the `new_after_n_chars` value since a value higher than that would not change this parameter's effect. +* ``new_after_n_chars (default 1500)``: Cuts off new sections once they reach a length of n characters (soft max). Defaults to `max_characters` when not specified, which effectively disables any soft window. Specifying 0 for this argument causes each element to appear in a chunk by itself (although an element with text longer than `max_characters` will be still be split into two or more chunks). * ``max_characters (default 1500)``: Chunks elements text and text_as_html (if present) into chunks of length n characters (hard max) diff --git a/docs/source/ingest/configs/embedding_config.rst b/docs/source/ingest/configs/embedding_config.rst index 14db889bc..843219916 100644 --- a/docs/source/ingest/configs/embedding_config.rst +++ b/docs/source/ingest/configs/embedding_config.rst @@ -10,5 +10,6 @@ the dataset. Configs --------------------- -* ``api_key``: If an api key is required to generate the embeddings via an api (i.e. OpenAI) -* ``model_name``: The model to use for the embedder. +* ``embedding_provider``: An unstructured embedding provider to use while doing embedding. A few examples: langchain-openai, langchain-huggingface, langchain-aws-bedrock. +* ``embedding_api_key``: If an api key is required to generate the embeddings via an api (i.e. OpenAI) +* ``embedding_model_name``: The model to use for the embedder, if necessary. diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index d9055d207..9f9d44ede 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -11,3 +11,5 @@ in our community `Slack. `_ destination_connectors/azure_cognitive_search destination_connectors/delta_table destination_connectors/mongodb + destination_connectors/pinecone + destination_connectors/s3 diff --git a/docs/source/ingest/destination_connectors/pinecone.rst b/docs/source/ingest/destination_connectors/pinecone.rst new file mode 100644 index 000000000..ac8888d5e --- /dev/null +++ b/docs/source/ingest/destination_connectors/pinecone.rst @@ -0,0 +1,79 @@ +Pinecone +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs and embeddings locally on your filesystem and upload those to a Pinecone index. + +First you'll need to install the Pinecone dependencies as shown here. + +.. code:: shell + + pip install "unstructured[pinecone]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. This will create new files on your local. + +.. tabs:: + + .. tab:: Shell + + .. code:: shell + + unstructured-ingest \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-to-pinecone \ + --strategy fast \ + --chunk-elements \ + --embedding-provider \ + --num-processes 2 \ + --verbose \ + --work-dir "" \ + pinecone \ + --api-key \ + --index-name \ + --environment \ + --batch-size \ + --num-processes + + .. tab:: Python + + .. code:: python + + import os + + from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig, ChunkingConfig, EmbeddingConfig + from unstructured.ingest.runner import LocalRunner + if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-pinecone", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + chunking_config=ChunkingConfig( + chunk_elements=True + ), + embedding_config=EmbeddingConfig( + provider="langchain-huggingface", + ), + writer_type="pinecone", + writer_kwargs={ + "api_key": os.getenv("PINECONE_API_KEY"), + "index_name": os.getenv("PINECONE_INDEX_NAME"), + "environment": os.getenv("PINECONE_ENVIRONMENT_NAME"), + "batch_size": 80, + "num_processes": 2, + } + ) + runner.run( + input_path="example-docs/fake-memo.pdf", + ) + + +For a full list of the options the CLI accepts check ``unstructured-ingest pinecone --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/docs/source/ingest/destination_connectors/s3.rst b/docs/source/ingest/destination_connectors/s3.rst new file mode 100644 index 000000000..d6593895e --- /dev/null +++ b/docs/source/ingest/destination_connectors/s3.rst @@ -0,0 +1,73 @@ +S3 +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to an S3 bucket. + +First you'll need to install the S3 dependencies as shown here. + +.. code:: shell + + pip install "unstructured[s3]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. This will create new files on your local. + +.. tabs:: + + .. tab:: Shell + + .. code:: shell + + unstructured-ingest \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-to-s3 \ + --strategy fast \ + --chunk-elements \ + --embedding-provider \ + --num-processes 2 \ + --verbose \ + --work-dir "" \ + s3 \ + --anonymous \ + --remote-url "" + + .. tab:: Python + + .. code:: python + + import os + + from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig, ChunkingConfig, EmbeddingConfig + from unstructured.ingest.runner import LocalRunner + if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-s3", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + chunking_config=ChunkingConfig( + chunk_elements=True + ), + embedding_config=EmbeddingConfig( + provider="langchain-huggingface", + ), + writer_type="s3", + writer_kwargs={ + "anonymous": True, + "--remote-url": "", + } + ) + runner.run( + input_path="example-docs/book-war-and-peace-1225p.txt", + ) + + +For a full list of the options the CLI accepts check ``unstructured-ingest s3 --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/examples/ingest/pinecone/ingest.sh b/examples/ingest/pinecone/ingest.sh new file mode 100755 index 000000000..aabce6a1f --- /dev/null +++ b/examples/ingest/pinecone/ingest.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# Processes all the files from s3://utic-dev-tech-fixtures/small-pdf-set/, +# embeds the processed documents, and writes to results to a Pinecone index. + +# Structured outputs are stored in s3-small-batch-output-to-pinecone/ + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + + +# As an example we're using the s3 source connector, +# however ingesting from any supported source connector is possible. +# shellcheck disable=2094 +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-to-pinecone \ + --strategy fast \ + --chunk-elements \ + --embedding-provider \ + --num-processes 2 \ + --verbose \ + --work-dir "" \ + pinecone \ + --api-key "" \ + --index-name "" \ + --environment "" \ + --batch-size "" \ + --num-processes "" diff --git a/requirements/ingest/pinecone.in b/requirements/ingest/pinecone.in new file mode 100644 index 000000000..939f61e6d --- /dev/null +++ b/requirements/ingest/pinecone.in @@ -0,0 +1,3 @@ +-c constraints.in +-c base.txt +pinecone-client diff --git a/requirements/ingest/pinecone.txt b/requirements/ingest/pinecone.txt new file mode 100644 index 000000000..19c30cdef --- /dev/null +++ b/requirements/ingest/pinecone.txt @@ -0,0 +1,56 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile requirements/ingest-pinecone.in +# +certifi==2023.7.22 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests +charset-normalizer==3.3.0 + # via + # -c requirements/base.txt + # requests +dnspython==2.4.2 + # via pinecone-client +idna==3.4 + # via + # -c requirements/base.txt + # requests +loguru==0.7.2 + # via pinecone-client +numpy==1.24.4 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # pinecone-client +pinecone-client==2.2.4 + # via -r requirements/ingest-pinecone.in +python-dateutil==2.8.2 + # via pinecone-client +pyyaml==6.0.1 + # via pinecone-client +requests==2.31.0 + # via + # -c requirements/base.txt + # pinecone-client +six==1.16.0 + # via + # -c requirements/base.txt + # python-dateutil +tqdm==4.66.1 + # via + # -c requirements/base.txt + # pinecone-client +typing-extensions==4.8.0 + # via + # -c requirements/base.txt + # pinecone-client +urllib3==1.26.18 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # pinecone-client + # requests diff --git a/setup.py b/setup.py index 3799f30f1..ea92216dc 100644 --- a/setup.py +++ b/setup.py @@ -127,33 +127,34 @@ setup( "tsv": tsv_reqs, "xlsx": xlsx_reqs, # Extra requirements for data connectors - "s3": load_requirements("requirements/ingest/s3.in"), + "airtable": load_requirements("requirements/ingest/airtable.in"), "azure": load_requirements("requirements/ingest/azure.in"), "azure-cognitive-search": load_requirements( "requirements/ingest/azure-cognitive-search.in", ), "biomed": load_requirements("requirements/ingest/biomed.in"), + "box": load_requirements("requirements/ingest/box.in"), + "confluence": load_requirements("requirements/ingest/confluence.in"), + "delta-table": load_requirements("requirements/ingest/delta-table.in"), "discord": load_requirements("requirements/ingest/discord.in"), + "dropbox": load_requirements("requirements/ingest/dropbox.in"), + "elasticsearch": load_requirements("requirements/ingest/elasticsearch.in"), + "gcs": load_requirements("requirements/ingest/gcs.in"), "github": load_requirements("requirements/ingest/github.in"), "gitlab": load_requirements("requirements/ingest/gitlab.in"), - "reddit": load_requirements("requirements/ingest/reddit.in"), - "notion": load_requirements("requirements/ingest/notion.in"), - "slack": load_requirements("requirements/ingest/slack.in"), - "wikipedia": load_requirements("requirements/ingest/wikipedia.in"), "google-drive": load_requirements("requirements/ingest/google-drive.in"), - "gcs": load_requirements("requirements/ingest/gcs.in"), - "elasticsearch": load_requirements("requirements/ingest/elasticsearch.in"), - "dropbox": load_requirements("requirements/ingest/dropbox.in"), - "box": load_requirements("requirements/ingest/box.in"), + "hubspot": load_requirements("requirements/ingest/hubspot.in"), + "jira": load_requirements("requirements/ingest/jira.in"), + "notion": load_requirements("requirements/ingest/notion.in"), "onedrive": load_requirements("requirements/ingest/onedrive.in"), "outlook": load_requirements("requirements/ingest/outlook.in"), - "confluence": load_requirements("requirements/ingest/confluence.in"), - "airtable": load_requirements("requirements/ingest/airtable.in"), + "pinecone": load_requirements("requirements/ingest/pinecone.in"), + "reddit": load_requirements("requirements/ingest/reddit.in"), + "s3": load_requirements("requirements/ingest/s3.in"), "sharepoint": load_requirements("requirements/ingest/sharepoint.in"), - "delta-table": load_requirements("requirements/ingest/delta-table.in"), "salesforce": load_requirements("requirements/ingest/salesforce.in"), - "jira": load_requirements("requirements/ingest/jira.in"), - "hubspot": load_requirements("requirements/ingest/hubspot.in"), + "slack": load_requirements("requirements/ingest/slack.in"), + "wikipedia": load_requirements("requirements/ingest/wikipedia.in"), # Legacy extra requirements "huggingface": load_requirements("requirements/huggingface.in"), "local-inference": all_doc_reqs, diff --git a/test_unstructured_ingest/dest/pinecone.sh b/test_unstructured_ingest/dest/pinecone.sh new file mode 100755 index 000000000..d5e45c90b --- /dev/null +++ b/test_unstructured_ingest/dest/pinecone.sh @@ -0,0 +1,133 @@ +#!/usr/bin/env bash + +set -e + +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=s3-pinecone-dest +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} +writer_processes=$(( (max_processes - 1) > 1 ? (max_processes - 1) : 2 )) + +if [ -z "$PINECONE_API_KEY" ]; then + echo "Skipping Pinecone ingest test because PINECONE_API_KEY env var is not set." + exit 0 +fi + +RANDOM_SUFFIX=$((RANDOM % 100000 + 1)) + +# Set the variables with default values if they're not set in the environment +PINECONE_INDEX=${PINECONE_INDEX:-"ingest-test-$RANDOM_SUFFIX"} +PINECONE_ENVIRONMENT=${PINECONE_ENVIRONMENT:-"us-east1-gcp"} +PINECONE_PROJECT_ID=${PINECONE_PROJECT_ID:-"art8iaj"} + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + + # Get response code to check if index exists + response_code=$(curl \ + -s -o /dev/null \ + -w "%{http_code}" \ + --request GET \ + --url "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases/$PINECONE_INDEX" \ + --header 'accept: application/json' \ + --header "Api-Key: $PINECONE_API_KEY") + + # Cleanup (delete) index if it exists + if [ "$response_code" == "200" ]; then + echo "" + echo "deleting index $PINECONE_INDEX" + curl --request DELETE \ + "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases/$PINECONE_INDEX" \ + --header "Api-Key: $PINECONE_API_KEY" \ + --header 'content-type: application/json' + + else + echo "There was an error during index deletion for index $PINECONE_INDEX, with response code: $response_code. It might be that index $PINECONE_INDEX does not exist, so there is nothing to delete." + fi + + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" +} + +trap cleanup EXIT + +echo "Creating index $PINECONE_INDEX" +response_code=$(curl \ + -s -o /dev/null \ + -w "%{http_code}" \ + --request POST \ + --url "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases" \ + --header "accept: text/plain" \ + --header "content-type: application/json" \ + --header "Api-Key: $PINECONE_API_KEY" \ + --data ' +{ + "name": "'"$PINECONE_INDEX"'", + "dimension": 384, + "metric": "cosine", + "pods": 1, + "replicas": 1, + "pod_type": "p1.x1" +} +') + +if [ "$response_code" -lt 400 ]; then + echo "Index creation success: $response_code" +else + echo "Index creation failure: $response_code" + exit 1 +fi + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --work-dir "$WORK_DIR" \ + --chunk-elements \ + --chunk-combine-text-under-n-chars 200\ + --chunk-new-after-n-chars 2500\ + --chunk-max-characters 38000\ + --chunk-multipage-sections \ + --embedding-provider "langchain-huggingface" \ + pinecone \ + --api-key "$PINECONE_API_KEY" \ + --index-name "$PINECONE_INDEX" \ + --environment "$PINECONE_ENVIRONMENT" \ + --batch-size 80 \ + --num-processes "$writer_processes" + +# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps +# to give it that time process the writes. Will timeout after checking for a minute. +num_of_vectors_remote=0 +attempt=1 +sleep_amount=8 +while [ "$num_of_vectors_remote" -eq 0 ] && [ "$attempt" -lt 4 ]; do + echo "attempt $attempt: sleeping $sleep_amount seconds to let index finish catching up after writes" + sleep $sleep_amount + + num_of_vectors_remote=$(curl --request POST \ + -s \ + --url "https://$PINECONE_INDEX-$PINECONE_PROJECT_ID.svc.$PINECONE_ENVIRONMENT.pinecone.io/describe_index_stats" \ + --header "accept: application/json" \ + --header "content-type: application/json" \ + --header "Api-Key: $PINECONE_API_KEY" | jq -r '.totalVectorCount') + + echo "vector count in Pinecone: $num_of_vectors_remote" + attempt=$((attempt+1)) +done + +EXPECTED=1404 + +if [ "$num_of_vectors_remote" -ne $EXPECTED ];then + echo "Number of vectors in Pinecone are $num_of_vectors_remote when the expected number is $EXPECTED. Test failed." + exit 1 +fi diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index c2e9cdb6b..43815c6ba 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -22,6 +22,7 @@ all_tests=( 'dropbox.sh' 'gcs.sh' 'mongodb.sh' + 'pinecone.sh' 's3.sh' 'sharepoint-embed-cog-index.sh' ) diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 3541f8bc1..8bb035ab7 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.11.1" # pragma: no cover +__version__ = "0.11.2-dev0" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index ecdc80ab6..eb18dbfb3 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -33,6 +33,7 @@ from .mongodb import get_base_dest_cmd as mongo_base_dest_cmd from .notion import get_base_src_cmd as notion_base_src_cmd from .onedrive import get_base_src_cmd as onedrive_base_src_cmd from .outlook import get_base_src_cmd as outlook_base_src_cmd +from .pinecone import get_base_dest_cmd as pinecone_base_dest_cmd from .reddit import get_base_src_cmd as reddit_base_src_cmd from .s3 import get_base_dest_cmd as s3_base_dest_cmd from .s3 import get_base_src_cmd as s3_base_src_cmd @@ -93,6 +94,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [ azure_cognitive_search_base_dest_cmd, delta_table_dest_cmd, mongo_base_dest_cmd, + pinecone_base_dest_cmd, ] # Make sure there are not overlapping names diff --git a/unstructured/ingest/cli/cmds/pinecone.py b/unstructured/ingest/cli/cmds/pinecone.py new file mode 100644 index 000000000..d6ba6a7b2 --- /dev/null +++ b/unstructured/ingest/cli/cmds/pinecone.py @@ -0,0 +1,66 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import ( + CliConfig, +) +from unstructured.ingest.connector.pinecone import SimplePineconeConfig + + +@dataclass +class PineconeCliWriteConfig(SimplePineconeConfig, CliConfig): + api_key: str + index_name: str + environment: str + batch_size: int + num_processes: int + + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--api-key"], + required=True, + type=str, + help="API key used for authenticating to a Pinecone instance.", + envvar="PINECONE_API_KEY", + show_envvar=True, + ), + click.Option( + ["--index-name"], + required=True, + type=str, + help="The name of the pinecone index to connect to.", + ), + click.Option( + ["--environment"], + required=True, + type=str, + help="The environment where the index lives. Eg. 'gcp-starter' or 'us-east1-gcp'", + ), + click.Option( + ["--batch-size"], + default=50, + type=int, + help="Number of records per batch", + ), + click.Option( + ["--num-processes"], + default=2, + type=int, + help="Number of parallel processes with which to upload elements", + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name="pinecone", + cli_config=PineconeCliWriteConfig, + ) + return cmd_cls diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index 2accfeed7..c883a8c07 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -468,7 +468,7 @@ class CliChunkingConfig(ChunkingConfig, CliMixin): default=False, ), click.Option( - ["--chunk-combine-under-n-chars"], + ["--chunk-combine-text-under-n-chars"], type=int, default=500, show_default=True, @@ -479,6 +479,12 @@ class CliChunkingConfig(ChunkingConfig, CliMixin): default=1500, show_default=True, ), + click.Option( + ["--chunk-max-characters"], + type=int, + default=1500, + show_default=True, + ), ] return options @@ -503,9 +509,9 @@ class CliChunkingConfig(ChunkingConfig, CliMixin): new_kvs["chunk_elements"] = chunk_elements new_kvs.update( { - k[len("chunking_") :]: v # noqa: E203 + k[len("chunk_") :]: v # noqa: E203 for k, v in kvs.items() - if k.startswith("chunking_") + if k.startswith("chunk_") }, ) if len(new_kvs.keys()) == 0: diff --git a/unstructured/ingest/connector/pinecone.py b/unstructured/ingest/connector/pinecone.py new file mode 100644 index 000000000..30a49efb5 --- /dev/null +++ b/unstructured/ingest/connector/pinecone.py @@ -0,0 +1,145 @@ +import json +import multiprocessing as mp +import typing as t +import uuid +from dataclasses import dataclass + +from unstructured.ingest.error import DestinationConnectionError, WriteError +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + BaseSessionHandle, + ConfigSessionHandleMixin, + IngestDocSessionHandleMixin, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.staging.base import flatten_dict +from unstructured.utils import requires_dependencies + +if t.TYPE_CHECKING: + from pinecone import Index as PineconeIndex + + +@dataclass +class PineconeSessionHandle(BaseSessionHandle): + service: "PineconeIndex" + + +@DestinationConnectionError.wrap +@requires_dependencies(["pinecone"], extras="pinecone") +def create_pinecone_object(api_key, index_name, environment): + import pinecone + + pinecone.init(api_key=api_key, environment=environment) + index = pinecone.Index(index_name) + logger.debug(f"Connected to index: {pinecone.describe_index(index_name)}") + return index + + +@dataclass +class SimplePineconeConfig(ConfigSessionHandleMixin, BaseConnectorConfig): + api_key: str + index_name: str + environment: str + + def create_session_handle(self) -> PineconeSessionHandle: + service = create_pinecone_object(self.api_key, self.index_name, self.environment) + return PineconeSessionHandle(service=service) + + +@dataclass +class PineconeWriteConfig(IngestDocSessionHandleMixin, WriteConfig): + connector_config: SimplePineconeConfig + batch_size: int = 50 + num_processes: int = 1 + + +@dataclass +class PineconeDestinationConnector(BaseDestinationConnector): + write_config: PineconeWriteConfig + connector_config: SimplePineconeConfig + + def initialize(self): + pass + + @DestinationConnectionError.wrap + def check_connection(self): + create_pinecone_object( + self.connector_config.api_key, + self.connector_config.index_name, + self.connector_config.environment, + ) + + @DestinationConnectionError.wrap + @requires_dependencies(["pinecone"], extras="pinecone") + def upsert_batch(self, batch): + import pinecone.core.client.exceptions + + self.write_config.global_session() + + index = self.write_config.session_handle.service + try: + response = index.upsert(batch) + except pinecone.core.client.exceptions.ApiException as api_error: + raise WriteError(f"http error: {api_error}") from api_error + logger.debug(f"results: {response}") + + def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"Upserting {len(dict_list)} elements to destination " + f"index at {self.connector_config.index_name}", + ) + + pinecone_batch_size = self.write_config.batch_size + + logger.info(f"using {self.write_config.num_processes} processes to upload") + if self.write_config.num_processes == 1: + for i in range(0, len(dict_list), pinecone_batch_size): + self.upsert_batch(dict_list[i : i + pinecone_batch_size]) # noqa: E203 + + else: + with mp.Pool( + processes=self.write_config.num_processes, + ) as pool: + pool.map( + self.upsert_batch, + [ + dict_list[i : i + pinecone_batch_size] # noqa: E203 + for i in range(0, len(dict_list), pinecone_batch_size) + ], # noqa: E203 + ) + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + dict_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + dict_content = json.load(json_file) + + # we assign embeddings to "values", and other fields to "metadata" + dict_content = [ + # While flatten_dict enables indexing on various fields, + # element_serialized enables easily reloading the element object to memory. + # element_serialized is formed without text/embeddings to avoid data bloating. + { + "id": str(uuid.uuid4()), + "values": element.pop("embeddings", None), + "metadata": { + "text": element.pop("text", None), + "element_serialized": json.dumps(element), + **flatten_dict( + element, + separator="-", + flatten_lists=True, + ), + }, + } + for element in dict_content + ] + logger.info( + f"appending {len(dict_content)} json elements from content in {local_path}", + ) + dict_list.extend(dict_content) + self.write_dict(dict_list=dict_list) diff --git a/unstructured/ingest/interfaces.py b/unstructured/ingest/interfaces.py index 88c2b5686..8d04077b9 100644 --- a/unstructured/ingest/interfaces.py +++ b/unstructured/ingest/interfaces.py @@ -191,6 +191,7 @@ class ChunkingConfig(BaseConfig): multipage_sections: bool = True combine_text_under_n_chars: int = 500 max_characters: int = 1500 + new_after_n_chars: t.Optional[int] = None def chunk(self, elements: t.List[Element]) -> t.List[Element]: if self.chunk_elements: @@ -199,6 +200,7 @@ class ChunkingConfig(BaseConfig): multipage_sections=self.multipage_sections, combine_text_under_n_chars=self.combine_text_under_n_chars, max_characters=self.max_characters, + new_after_n_chars=self.new_after_n_chars, ) else: return elements @@ -211,9 +213,25 @@ class PermissionsConfig(BaseConfig): tenant: t.Optional[str] +# module-level variable to store session handle +global_write_session_handle: t.Optional[BaseSessionHandle] = None + + @dataclass class WriteConfig(BaseConfig): - pass + def global_session(self): + try: + global global_write_session_handle + if isinstance(self, IngestDocSessionHandleMixin): + if global_write_session_handle is None: + # create via write_config.session_handle, which is a property that creates a + # session handle if one is not already defined + global_write_session_handle = self.session_handle + else: + self._session_handle = global_write_session_handle + except Exception as e: + print("Global session handle creation error") + raise (e) class BaseConnectorConfig(ABC): diff --git a/unstructured/ingest/pipeline/pipeline.py b/unstructured/ingest/pipeline/pipeline.py index 51fb207dd..7f587ca07 100644 --- a/unstructured/ingest/pipeline/pipeline.py +++ b/unstructured/ingest/pipeline/pipeline.py @@ -106,6 +106,10 @@ class Pipeline(DataClassJsonMixin): copier(iterable=partitioned_jsons) if self.write_node: + logger.info( + f"uploading elements from {len(partitioned_jsons)} " + "document(s) to the destination" + ) self.write_node(iterable=partitioned_jsons) if self.permissions_node: diff --git a/unstructured/ingest/pipeline/reformat/chunking.py b/unstructured/ingest/pipeline/reformat/chunking.py index 2f8cac3c6..a6791b745 100644 --- a/unstructured/ingest/pipeline/reformat/chunking.py +++ b/unstructured/ingest/pipeline/reformat/chunking.py @@ -45,13 +45,13 @@ class Chunker(ReformatNode): and json_path.is_file() and json_path.stat().st_size ): - logger.debug(f"File exists: {json_path}, skipping embedding") + logger.debug(f"File exists: {json_path}, skipping chunking") return str(json_path) elements = elements_from_json(filename=elements_json) chunked_elements = self.chunking_config.chunk(elements=elements) elements_dict = convert_to_dict(chunked_elements) with open(json_path, "w", encoding="utf8") as output_f: - logger.info(f"writing embeddings content to {json_path}") + logger.info(f"writing chunking content to {json_path}") json.dump(elements_dict, output_f, ensure_ascii=False, indent=2) return str(json_path) except Exception as e: diff --git a/unstructured/ingest/pipeline/reformat/embedding.py b/unstructured/ingest/pipeline/reformat/embedding.py index 04b3682dc..8e9a58a33 100644 --- a/unstructured/ingest/pipeline/reformat/embedding.py +++ b/unstructured/ingest/pipeline/reformat/embedding.py @@ -58,7 +58,7 @@ class Embedder(ReformatNode): except Exception as e: if self.pipeline_context.raise_on_error: raise - logger.error(f"failed to chunk content from file {elements_json}, {e}", exc_info=True) + logger.error(f"failed to embed content from file {elements_json}, {e}", exc_info=True) return None def get_path(self) -> Path: diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index e2a8d9de1..e8df86864 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -7,6 +7,7 @@ from .delta_table import delta_table_writer from .dropbox import dropbox_writer from .gcs import gcs_writer from .mongodb import mongodb_writer +from .pinecone import pinecone_writer from .s3 import s3_writer writer_map: t.Dict[str, t.Callable] = { @@ -18,6 +19,7 @@ writer_map: t.Dict[str, t.Callable] = { "gcs": gcs_writer, "mongodb": mongodb_writer, "s3": s3_writer, + "pinecone": pinecone_writer, } __all__ = ["writer_map"] diff --git a/unstructured/ingest/runner/writers/pinecone.py b/unstructured/ingest/runner/writers/pinecone.py new file mode 100644 index 000000000..6c57d0835 --- /dev/null +++ b/unstructured/ingest/runner/writers/pinecone.py @@ -0,0 +1,33 @@ +from unstructured.ingest.interfaces import BaseDestinationConnector +from unstructured.utils import requires_dependencies + + +@requires_dependencies(["pinecone"], extras="pinecone") +def pinecone_writer( + api_key: str, + index_name: str, + environment: str, + batch_size: int, + num_processes: int, + **kwargs, +) -> BaseDestinationConnector: + from unstructured.ingest.connector.pinecone import ( + PineconeDestinationConnector, + PineconeWriteConfig, + SimplePineconeConfig, + ) + + connector_config = SimplePineconeConfig( + api_key=api_key, + index_name=index_name, + environment=environment, + ) + + return PineconeDestinationConnector( + connector_config=connector_config, + write_config=PineconeWriteConfig( + connector_config=connector_config, + batch_size=batch_size, + num_processes=num_processes, + ), + ) diff --git a/unstructured/staging/base.py b/unstructured/staging/base.py index 3222b366b..6835da587 100644 --- a/unstructured/staging/base.py +++ b/unstructured/staging/base.py @@ -176,7 +176,9 @@ def elements_from_json( return dict_to_elements(element_dict) -def flatten_dict(dictionary, parent_key="", separator="_", keys_to_omit: List[str] = None): +def flatten_dict( + dictionary, parent_key="", separator="_", flatten_lists=False, keys_to_omit: List[str] = None +): keys_to_omit = keys_to_omit if keys_to_omit else [] flattened_dict = {} for key, value in dictionary.items(): @@ -185,8 +187,19 @@ def flatten_dict(dictionary, parent_key="", separator="_", keys_to_omit: List[st flattened_dict[new_key] = value elif isinstance(value, dict): flattened_dict.update( - flatten_dict(value, new_key, separator, keys_to_omit=keys_to_omit), + flatten_dict(value, new_key, separator, flatten_lists, keys_to_omit=keys_to_omit), ) + elif isinstance(value, list) and flatten_lists: + for index, item in enumerate(value): + flattened_dict.update( + flatten_dict( + {f"{new_key}{separator}{index}": item}, + "", + separator, + flatten_lists, + keys_to_omit=keys_to_omit, + ) + ) else: flattened_dict[new_key] = value return flattened_dict