feat: add pinecone destination connector (#1774)

Closes https://github.com/Unstructured-IO/unstructured/issues/1414
Closes #2039 

This PR:
- Uses Pinecone python cli to implement a destination connector for
Pinecone and provides the ingest readme requirements
[(here)](https://github.com/Unstructured-IO/unstructured/tree/main/unstructured/ingest#the-checklist)
for the connector
- Updates documentation for the s3 destination connector
- Alphabetically sorts setup.py contents
- Updates logs for the chunking node  in ingest pipeline
- Adds a baseline session handle implementation for destination
connectors, to be able to parallelize their operations
- For the
[bug](https://github.com/Unstructured-IO/unstructured/issues/1892)
related to persisting element data to ingest embedding nodes; this PR
tests the
[solution](https://github.com/Unstructured-IO/unstructured/pull/1893)
with its ingest test
- Solves a bug on ingest chunking params with [bugfix on chunking params
and implementing related
test](69e1949a6f)

---------

Co-authored-by: Roman Isecke <136338424+rbiseck3@users.noreply.github.com>
This commit is contained in:
Ahmet Melek 2023-11-29 22:37:32 +00:00 committed by GitHub
parent 341f0f428c
commit ed08773de7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 715 additions and 28 deletions

View File

@ -287,6 +287,7 @@ jobs:
AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }}
AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}}
TABLE_OCR: "tesseract"
OCR_AGENT: "tesseract"
CI: "true"
@ -348,6 +349,7 @@ jobs:
MONGODB_URI: ${{ secrets.MONGODB_URI }}
MONGODB_DATABASE_NAME: ${{ secrets.MONGODB_DATABASE_NAME }}
AZURE_DEST_CONNECTION_STR: ${{ secrets.AZURE_DEST_CONNECTION_STR }}
PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}}
TABLE_OCR: "tesseract"
OCR_AGENT: "tesseract"
CI: "true"

View File

@ -1,3 +1,15 @@
## 0.11.2-dev0
### Enhancements
### Features
* * **Add Pinecone destination connector.** Problem: After ingesting data from a source, users might want to produce embeddings for their data and write these into a vector DB. Pinecone is an option among these vector databases. Feature: Added Pinecone destination connector to be able to ingest documents from any supported source, embed them and write the embeddings / documents into Pinecone.
### Fixes
* **Process chunking parameter names in ingest correctly** Solves a bug where chunking parameters weren't being processed and used by ingest cli by renaming faulty parameter names and prepends; adds relevant parameters to ingest pinecone test to verify that the parameters are functional.
## 0.11.1
### Enhancements

View File

@ -211,6 +211,10 @@ install-ingest-jira:
install-ingest-hubspot:
python3 -m pip install -r requirements/ingest-hubspot.txt
.PHONY: install-ingest-pinecone
install-ingest-pinecone:
python3 -m pip install -r requirements/ingest-pinecone.txt
.PHONY: install-embed-huggingface
install-embed-huggingface:
python3 -m pip install -r requirements/ingest/embed-huggingface.txt

View File

@ -26,11 +26,11 @@ that span between pages. This kwarg is ``True`` by default.
not split elements, it is possible for a section to exceed that lenght, for
example if a ``NarrativeText`` elements exceeds ``1500`` characters on its on.
Similarly, sections under ``combine_under_n_chars`` will be combined if they
Similarly, sections under ``combine_text_under_n_chars`` will be combined if they
do not exceed the specified threshold, which defaults to ``500``. This will combine
a series of ``Title`` elements that occur one after another, which sometimes
happens in lists that are not detected as ``ListItem`` elements. Set
``combine_under_n_chars=0`` to turn off this behavior.
``combine_text_under_n_chars=0`` to turn off this behavior.
The following shows an example of how to use ``chunk_by_title``. You will
see the document chunked into sections instead of elements.

View File

@ -16,4 +16,5 @@ Configs
* ``chunk_elements (default False)``: Boolean flag whether to run chunking as part of the ingest process.
* ``multipage_sections (default True)``: If True, sections can span multiple pages.
* ``combine_text_under_n_chars (default 500)``: Combines elements (for example a series of titles) until a section reaches a length of n characters. Defaults to `max_characters` which combines chunks whenever space allows. Specifying 0 for this argument suppresses combining of small chunks. Note this value is "capped" at the `new_after_n_chars` value since a value higher than that would not change this parameter's effect.
* ``new_after_n_chars (default 1500)``: Cuts off new sections once they reach a length of n characters (soft max). Defaults to `max_characters` when not specified, which effectively disables any soft window. Specifying 0 for this argument causes each element to appear in a chunk by itself (although an element with text longer than `max_characters` will be still be split into two or more chunks).
* ``max_characters (default 1500)``: Chunks elements text and text_as_html (if present) into chunks of length n characters (hard max)

View File

@ -10,5 +10,6 @@ the dataset.
Configs
---------------------
* ``api_key``: If an api key is required to generate the embeddings via an api (i.e. OpenAI)
* ``model_name``: The model to use for the embedder.
* ``embedding_provider``: An unstructured embedding provider to use while doing embedding. A few examples: langchain-openai, langchain-huggingface, langchain-aws-bedrock.
* ``embedding_api_key``: If an api key is required to generate the embeddings via an api (i.e. OpenAI)
* ``embedding_model_name``: The model to use for the embedder, if necessary.

View File

@ -11,3 +11,5 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/azure_cognitive_search
destination_connectors/delta_table
destination_connectors/mongodb
destination_connectors/pinecone
destination_connectors/s3

View File

@ -0,0 +1,79 @@
Pinecone
===========
Batch process all your records using ``unstructured-ingest`` to store structured outputs and embeddings locally on your filesystem and upload those to a Pinecone index.
First you'll need to install the Pinecone dependencies as shown here.
.. code:: shell
pip install "unstructured[pinecone]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector. This will create new files on your local.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-to-pinecone \
--strategy fast \
--chunk-elements \
--embedding-provider <an unstructured embedding provider, ie. langchain-huggingface> \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
pinecone \
--api-key <your pinecone api key here> \
--index-name <your index name here, ie. ingest-test> \
--environment <your environment name here, ie. gcp-starter> \
--batch-size <number of elements to be uploaded per batch, ie. 80> \
--num-processes <number of processes to be used to upload, ie. 2>
.. tab:: Python
.. code:: python
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig, ChunkingConfig, EmbeddingConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-pinecone",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(
chunk_elements=True
),
embedding_config=EmbeddingConfig(
provider="langchain-huggingface",
),
writer_type="pinecone",
writer_kwargs={
"api_key": os.getenv("PINECONE_API_KEY"),
"index_name": os.getenv("PINECONE_INDEX_NAME"),
"environment": os.getenv("PINECONE_ENVIRONMENT_NAME"),
"batch_size": 80,
"num_processes": 2,
}
)
runner.run(
input_path="example-docs/fake-memo.pdf",
)
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> pinecone --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,73 @@
S3
===========
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to an S3 bucket.
First you'll need to install the S3 dependencies as shown here.
.. code:: shell
pip install "unstructured[s3]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector. This will create new files on your local.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-to-s3 \
--strategy fast \
--chunk-elements \
--embedding-provider <an unstructured embedding provider, ie. langchain-huggingface> \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
s3 \
--anonymous \
--remote-url "<your destination path here, ie 's3://unstructured/war-and-peace-output'>"
.. tab:: Python
.. code:: python
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig, ChunkingConfig, EmbeddingConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-s3",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(
chunk_elements=True
),
embedding_config=EmbeddingConfig(
provider="langchain-huggingface",
),
writer_type="s3",
writer_kwargs={
"anonymous": True,
"--remote-url": "<your destination path here, ie 's3://unstructured/war-and-peace-output'>",
}
)
runner.run(
input_path="example-docs/book-war-and-peace-1225p.txt",
)
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> s3 --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Processes all the files from s3://utic-dev-tech-fixtures/small-pdf-set/,
# embeds the processed documents, and writes to results to a Pinecone index.
# Structured outputs are stored in s3-small-batch-output-to-pinecone/
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
# As an example we're using the s3 source connector,
# however ingesting from any supported source connector is possible.
# shellcheck disable=2094
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-to-pinecone \
--strategy fast \
--chunk-elements \
--embedding-provider <an unstructured embedding provider, ie. langchain-huggingface> \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
pinecone \
--api-key "<Pinecone API Key to write into a Pinecone index>" \
--index-name "<Pinecone index name, ie: ingest-test>" \
--environment "<Pinecone index name, ie: ingest-test>" \
--batch-size "<Number of elements to be uploaded per batch, ie. 80>" \
--num-processes "<Number of processes to be used to upload, ie. 2>"

View File

@ -0,0 +1,3 @@
-c constraints.in
-c base.txt
pinecone-client

View File

@ -0,0 +1,56 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile requirements/ingest-pinecone.in
#
certifi==2023.7.22
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# requests
charset-normalizer==3.3.0
# via
# -c requirements/base.txt
# requests
dnspython==2.4.2
# via pinecone-client
idna==3.4
# via
# -c requirements/base.txt
# requests
loguru==0.7.2
# via pinecone-client
numpy==1.24.4
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# pinecone-client
pinecone-client==2.2.4
# via -r requirements/ingest-pinecone.in
python-dateutil==2.8.2
# via pinecone-client
pyyaml==6.0.1
# via pinecone-client
requests==2.31.0
# via
# -c requirements/base.txt
# pinecone-client
six==1.16.0
# via
# -c requirements/base.txt
# python-dateutil
tqdm==4.66.1
# via
# -c requirements/base.txt
# pinecone-client
typing-extensions==4.8.0
# via
# -c requirements/base.txt
# pinecone-client
urllib3==1.26.18
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# pinecone-client
# requests

View File

@ -127,33 +127,34 @@ setup(
"tsv": tsv_reqs,
"xlsx": xlsx_reqs,
# Extra requirements for data connectors
"s3": load_requirements("requirements/ingest/s3.in"),
"airtable": load_requirements("requirements/ingest/airtable.in"),
"azure": load_requirements("requirements/ingest/azure.in"),
"azure-cognitive-search": load_requirements(
"requirements/ingest/azure-cognitive-search.in",
),
"biomed": load_requirements("requirements/ingest/biomed.in"),
"box": load_requirements("requirements/ingest/box.in"),
"confluence": load_requirements("requirements/ingest/confluence.in"),
"delta-table": load_requirements("requirements/ingest/delta-table.in"),
"discord": load_requirements("requirements/ingest/discord.in"),
"dropbox": load_requirements("requirements/ingest/dropbox.in"),
"elasticsearch": load_requirements("requirements/ingest/elasticsearch.in"),
"gcs": load_requirements("requirements/ingest/gcs.in"),
"github": load_requirements("requirements/ingest/github.in"),
"gitlab": load_requirements("requirements/ingest/gitlab.in"),
"reddit": load_requirements("requirements/ingest/reddit.in"),
"notion": load_requirements("requirements/ingest/notion.in"),
"slack": load_requirements("requirements/ingest/slack.in"),
"wikipedia": load_requirements("requirements/ingest/wikipedia.in"),
"google-drive": load_requirements("requirements/ingest/google-drive.in"),
"gcs": load_requirements("requirements/ingest/gcs.in"),
"elasticsearch": load_requirements("requirements/ingest/elasticsearch.in"),
"dropbox": load_requirements("requirements/ingest/dropbox.in"),
"box": load_requirements("requirements/ingest/box.in"),
"hubspot": load_requirements("requirements/ingest/hubspot.in"),
"jira": load_requirements("requirements/ingest/jira.in"),
"notion": load_requirements("requirements/ingest/notion.in"),
"onedrive": load_requirements("requirements/ingest/onedrive.in"),
"outlook": load_requirements("requirements/ingest/outlook.in"),
"confluence": load_requirements("requirements/ingest/confluence.in"),
"airtable": load_requirements("requirements/ingest/airtable.in"),
"pinecone": load_requirements("requirements/ingest/pinecone.in"),
"reddit": load_requirements("requirements/ingest/reddit.in"),
"s3": load_requirements("requirements/ingest/s3.in"),
"sharepoint": load_requirements("requirements/ingest/sharepoint.in"),
"delta-table": load_requirements("requirements/ingest/delta-table.in"),
"salesforce": load_requirements("requirements/ingest/salesforce.in"),
"jira": load_requirements("requirements/ingest/jira.in"),
"hubspot": load_requirements("requirements/ingest/hubspot.in"),
"slack": load_requirements("requirements/ingest/slack.in"),
"wikipedia": load_requirements("requirements/ingest/wikipedia.in"),
# Legacy extra requirements
"huggingface": load_requirements("requirements/huggingface.in"),
"local-inference": all_doc_reqs,

View File

@ -0,0 +1,133 @@
#!/usr/bin/env bash
set -e
DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=s3-pinecone-dest
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
writer_processes=$(( (max_processes - 1) > 1 ? (max_processes - 1) : 2 ))
if [ -z "$PINECONE_API_KEY" ]; then
echo "Skipping Pinecone ingest test because PINECONE_API_KEY env var is not set."
exit 0
fi
RANDOM_SUFFIX=$((RANDOM % 100000 + 1))
# Set the variables with default values if they're not set in the environment
PINECONE_INDEX=${PINECONE_INDEX:-"ingest-test-$RANDOM_SUFFIX"}
PINECONE_ENVIRONMENT=${PINECONE_ENVIRONMENT:-"us-east1-gcp"}
PINECONE_PROJECT_ID=${PINECONE_PROJECT_ID:-"art8iaj"}
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
# Get response code to check if index exists
response_code=$(curl \
-s -o /dev/null \
-w "%{http_code}" \
--request GET \
--url "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases/$PINECONE_INDEX" \
--header 'accept: application/json' \
--header "Api-Key: $PINECONE_API_KEY")
# Cleanup (delete) index if it exists
if [ "$response_code" == "200" ]; then
echo ""
echo "deleting index $PINECONE_INDEX"
curl --request DELETE \
"https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases/$PINECONE_INDEX" \
--header "Api-Key: $PINECONE_API_KEY" \
--header 'content-type: application/json'
else
echo "There was an error during index deletion for index $PINECONE_INDEX, with response code: $response_code. It might be that index $PINECONE_INDEX does not exist, so there is nothing to delete."
fi
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
}
trap cleanup EXIT
echo "Creating index $PINECONE_INDEX"
response_code=$(curl \
-s -o /dev/null \
-w "%{http_code}" \
--request POST \
--url "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/databases" \
--header "accept: text/plain" \
--header "content-type: application/json" \
--header "Api-Key: $PINECONE_API_KEY" \
--data '
{
"name": "'"$PINECONE_INDEX"'",
"dimension": 384,
"metric": "cosine",
"pods": 1,
"replicas": 1,
"pod_type": "p1.x1"
}
')
if [ "$response_code" -lt 400 ]; then
echo "Index creation success: $response_code"
else
echo "Index creation failure: $response_code"
exit 1
fi
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir "$WORK_DIR" \
--chunk-elements \
--chunk-combine-text-under-n-chars 200\
--chunk-new-after-n-chars 2500\
--chunk-max-characters 38000\
--chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
pinecone \
--api-key "$PINECONE_API_KEY" \
--index-name "$PINECONE_INDEX" \
--environment "$PINECONE_ENVIRONMENT" \
--batch-size 80 \
--num-processes "$writer_processes"
# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps
# to give it that time process the writes. Will timeout after checking for a minute.
num_of_vectors_remote=0
attempt=1
sleep_amount=8
while [ "$num_of_vectors_remote" -eq 0 ] && [ "$attempt" -lt 4 ]; do
echo "attempt $attempt: sleeping $sleep_amount seconds to let index finish catching up after writes"
sleep $sleep_amount
num_of_vectors_remote=$(curl --request POST \
-s \
--url "https://$PINECONE_INDEX-$PINECONE_PROJECT_ID.svc.$PINECONE_ENVIRONMENT.pinecone.io/describe_index_stats" \
--header "accept: application/json" \
--header "content-type: application/json" \
--header "Api-Key: $PINECONE_API_KEY" | jq -r '.totalVectorCount')
echo "vector count in Pinecone: $num_of_vectors_remote"
attempt=$((attempt+1))
done
EXPECTED=1404
if [ "$num_of_vectors_remote" -ne $EXPECTED ];then
echo "Number of vectors in Pinecone are $num_of_vectors_remote when the expected number is $EXPECTED. Test failed."
exit 1
fi

View File

@ -22,6 +22,7 @@ all_tests=(
'dropbox.sh'
'gcs.sh'
'mongodb.sh'
'pinecone.sh'
's3.sh'
'sharepoint-embed-cog-index.sh'
)

View File

@ -1 +1 @@
__version__ = "0.11.1" # pragma: no cover
__version__ = "0.11.2-dev0" # pragma: no cover

View File

@ -33,6 +33,7 @@ from .mongodb import get_base_dest_cmd as mongo_base_dest_cmd
from .notion import get_base_src_cmd as notion_base_src_cmd
from .onedrive import get_base_src_cmd as onedrive_base_src_cmd
from .outlook import get_base_src_cmd as outlook_base_src_cmd
from .pinecone import get_base_dest_cmd as pinecone_base_dest_cmd
from .reddit import get_base_src_cmd as reddit_base_src_cmd
from .s3 import get_base_dest_cmd as s3_base_dest_cmd
from .s3 import get_base_src_cmd as s3_base_src_cmd
@ -93,6 +94,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
azure_cognitive_search_base_dest_cmd,
delta_table_dest_cmd,
mongo_base_dest_cmd,
pinecone_base_dest_cmd,
]
# Make sure there are not overlapping names

View File

@ -0,0 +1,66 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import (
CliConfig,
)
from unstructured.ingest.connector.pinecone import SimplePineconeConfig
@dataclass
class PineconeCliWriteConfig(SimplePineconeConfig, CliConfig):
api_key: str
index_name: str
environment: str
batch_size: int
num_processes: int
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--api-key"],
required=True,
type=str,
help="API key used for authenticating to a Pinecone instance.",
envvar="PINECONE_API_KEY",
show_envvar=True,
),
click.Option(
["--index-name"],
required=True,
type=str,
help="The name of the pinecone index to connect to.",
),
click.Option(
["--environment"],
required=True,
type=str,
help="The environment where the index lives. Eg. 'gcp-starter' or 'us-east1-gcp'",
),
click.Option(
["--batch-size"],
default=50,
type=int,
help="Number of records per batch",
),
click.Option(
["--num-processes"],
default=2,
type=int,
help="Number of parallel processes with which to upload elements",
),
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name="pinecone",
cli_config=PineconeCliWriteConfig,
)
return cmd_cls

View File

@ -468,7 +468,7 @@ class CliChunkingConfig(ChunkingConfig, CliMixin):
default=False,
),
click.Option(
["--chunk-combine-under-n-chars"],
["--chunk-combine-text-under-n-chars"],
type=int,
default=500,
show_default=True,
@ -479,6 +479,12 @@ class CliChunkingConfig(ChunkingConfig, CliMixin):
default=1500,
show_default=True,
),
click.Option(
["--chunk-max-characters"],
type=int,
default=1500,
show_default=True,
),
]
return options
@ -503,9 +509,9 @@ class CliChunkingConfig(ChunkingConfig, CliMixin):
new_kvs["chunk_elements"] = chunk_elements
new_kvs.update(
{
k[len("chunking_") :]: v # noqa: E203
k[len("chunk_") :]: v # noqa: E203
for k, v in kvs.items()
if k.startswith("chunking_")
if k.startswith("chunk_")
},
)
if len(new_kvs.keys()) == 0:

View File

@ -0,0 +1,145 @@
import json
import multiprocessing as mp
import typing as t
import uuid
from dataclasses import dataclass
from unstructured.ingest.error import DestinationConnectionError, WriteError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
BaseSessionHandle,
ConfigSessionHandleMixin,
IngestDocSessionHandleMixin,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies
if t.TYPE_CHECKING:
from pinecone import Index as PineconeIndex
@dataclass
class PineconeSessionHandle(BaseSessionHandle):
service: "PineconeIndex"
@DestinationConnectionError.wrap
@requires_dependencies(["pinecone"], extras="pinecone")
def create_pinecone_object(api_key, index_name, environment):
import pinecone
pinecone.init(api_key=api_key, environment=environment)
index = pinecone.Index(index_name)
logger.debug(f"Connected to index: {pinecone.describe_index(index_name)}")
return index
@dataclass
class SimplePineconeConfig(ConfigSessionHandleMixin, BaseConnectorConfig):
api_key: str
index_name: str
environment: str
def create_session_handle(self) -> PineconeSessionHandle:
service = create_pinecone_object(self.api_key, self.index_name, self.environment)
return PineconeSessionHandle(service=service)
@dataclass
class PineconeWriteConfig(IngestDocSessionHandleMixin, WriteConfig):
connector_config: SimplePineconeConfig
batch_size: int = 50
num_processes: int = 1
@dataclass
class PineconeDestinationConnector(BaseDestinationConnector):
write_config: PineconeWriteConfig
connector_config: SimplePineconeConfig
def initialize(self):
pass
@DestinationConnectionError.wrap
def check_connection(self):
create_pinecone_object(
self.connector_config.api_key,
self.connector_config.index_name,
self.connector_config.environment,
)
@DestinationConnectionError.wrap
@requires_dependencies(["pinecone"], extras="pinecone")
def upsert_batch(self, batch):
import pinecone.core.client.exceptions
self.write_config.global_session()
index = self.write_config.session_handle.service
try:
response = index.upsert(batch)
except pinecone.core.client.exceptions.ApiException as api_error:
raise WriteError(f"http error: {api_error}") from api_error
logger.debug(f"results: {response}")
def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"Upserting {len(dict_list)} elements to destination "
f"index at {self.connector_config.index_name}",
)
pinecone_batch_size = self.write_config.batch_size
logger.info(f"using {self.write_config.num_processes} processes to upload")
if self.write_config.num_processes == 1:
for i in range(0, len(dict_list), pinecone_batch_size):
self.upsert_batch(dict_list[i : i + pinecone_batch_size]) # noqa: E203
else:
with mp.Pool(
processes=self.write_config.num_processes,
) as pool:
pool.map(
self.upsert_batch,
[
dict_list[i : i + pinecone_batch_size] # noqa: E203
for i in range(0, len(dict_list), pinecone_batch_size)
], # noqa: E203
)
def write(self, docs: t.List[BaseIngestDoc]) -> None:
dict_list: t.List[t.Dict[str, t.Any]] = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
dict_content = json.load(json_file)
# we assign embeddings to "values", and other fields to "metadata"
dict_content = [
# While flatten_dict enables indexing on various fields,
# element_serialized enables easily reloading the element object to memory.
# element_serialized is formed without text/embeddings to avoid data bloating.
{
"id": str(uuid.uuid4()),
"values": element.pop("embeddings", None),
"metadata": {
"text": element.pop("text", None),
"element_serialized": json.dumps(element),
**flatten_dict(
element,
separator="-",
flatten_lists=True,
),
},
}
for element in dict_content
]
logger.info(
f"appending {len(dict_content)} json elements from content in {local_path}",
)
dict_list.extend(dict_content)
self.write_dict(dict_list=dict_list)

View File

@ -191,6 +191,7 @@ class ChunkingConfig(BaseConfig):
multipage_sections: bool = True
combine_text_under_n_chars: int = 500
max_characters: int = 1500
new_after_n_chars: t.Optional[int] = None
def chunk(self, elements: t.List[Element]) -> t.List[Element]:
if self.chunk_elements:
@ -199,6 +200,7 @@ class ChunkingConfig(BaseConfig):
multipage_sections=self.multipage_sections,
combine_text_under_n_chars=self.combine_text_under_n_chars,
max_characters=self.max_characters,
new_after_n_chars=self.new_after_n_chars,
)
else:
return elements
@ -211,9 +213,25 @@ class PermissionsConfig(BaseConfig):
tenant: t.Optional[str]
# module-level variable to store session handle
global_write_session_handle: t.Optional[BaseSessionHandle] = None
@dataclass
class WriteConfig(BaseConfig):
pass
def global_session(self):
try:
global global_write_session_handle
if isinstance(self, IngestDocSessionHandleMixin):
if global_write_session_handle is None:
# create via write_config.session_handle, which is a property that creates a
# session handle if one is not already defined
global_write_session_handle = self.session_handle
else:
self._session_handle = global_write_session_handle
except Exception as e:
print("Global session handle creation error")
raise (e)
class BaseConnectorConfig(ABC):

View File

@ -106,6 +106,10 @@ class Pipeline(DataClassJsonMixin):
copier(iterable=partitioned_jsons)
if self.write_node:
logger.info(
f"uploading elements from {len(partitioned_jsons)} "
"document(s) to the destination"
)
self.write_node(iterable=partitioned_jsons)
if self.permissions_node:

View File

@ -45,13 +45,13 @@ class Chunker(ReformatNode):
and json_path.is_file()
and json_path.stat().st_size
):
logger.debug(f"File exists: {json_path}, skipping embedding")
logger.debug(f"File exists: {json_path}, skipping chunking")
return str(json_path)
elements = elements_from_json(filename=elements_json)
chunked_elements = self.chunking_config.chunk(elements=elements)
elements_dict = convert_to_dict(chunked_elements)
with open(json_path, "w", encoding="utf8") as output_f:
logger.info(f"writing embeddings content to {json_path}")
logger.info(f"writing chunking content to {json_path}")
json.dump(elements_dict, output_f, ensure_ascii=False, indent=2)
return str(json_path)
except Exception as e:

View File

@ -58,7 +58,7 @@ class Embedder(ReformatNode):
except Exception as e:
if self.pipeline_context.raise_on_error:
raise
logger.error(f"failed to chunk content from file {elements_json}, {e}", exc_info=True)
logger.error(f"failed to embed content from file {elements_json}, {e}", exc_info=True)
return None
def get_path(self) -> Path:

View File

@ -7,6 +7,7 @@ from .delta_table import delta_table_writer
from .dropbox import dropbox_writer
from .gcs import gcs_writer
from .mongodb import mongodb_writer
from .pinecone import pinecone_writer
from .s3 import s3_writer
writer_map: t.Dict[str, t.Callable] = {
@ -18,6 +19,7 @@ writer_map: t.Dict[str, t.Callable] = {
"gcs": gcs_writer,
"mongodb": mongodb_writer,
"s3": s3_writer,
"pinecone": pinecone_writer,
}
__all__ = ["writer_map"]

View File

@ -0,0 +1,33 @@
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.utils import requires_dependencies
@requires_dependencies(["pinecone"], extras="pinecone")
def pinecone_writer(
api_key: str,
index_name: str,
environment: str,
batch_size: int,
num_processes: int,
**kwargs,
) -> BaseDestinationConnector:
from unstructured.ingest.connector.pinecone import (
PineconeDestinationConnector,
PineconeWriteConfig,
SimplePineconeConfig,
)
connector_config = SimplePineconeConfig(
api_key=api_key,
index_name=index_name,
environment=environment,
)
return PineconeDestinationConnector(
connector_config=connector_config,
write_config=PineconeWriteConfig(
connector_config=connector_config,
batch_size=batch_size,
num_processes=num_processes,
),
)

View File

@ -176,7 +176,9 @@ def elements_from_json(
return dict_to_elements(element_dict)
def flatten_dict(dictionary, parent_key="", separator="_", keys_to_omit: List[str] = None):
def flatten_dict(
dictionary, parent_key="", separator="_", flatten_lists=False, keys_to_omit: List[str] = None
):
keys_to_omit = keys_to_omit if keys_to_omit else []
flattened_dict = {}
for key, value in dictionary.items():
@ -185,8 +187,19 @@ def flatten_dict(dictionary, parent_key="", separator="_", keys_to_omit: List[st
flattened_dict[new_key] = value
elif isinstance(value, dict):
flattened_dict.update(
flatten_dict(value, new_key, separator, keys_to_omit=keys_to_omit),
flatten_dict(value, new_key, separator, flatten_lists, keys_to_omit=keys_to_omit),
)
elif isinstance(value, list) and flatten_lists:
for index, item in enumerate(value):
flattened_dict.update(
flatten_dict(
{f"{new_key}{separator}{index}": item},
"",
separator,
flatten_lists,
keys_to_omit=keys_to_omit,
)
)
else:
flattened_dict[new_key] = value
return flattened_dict