feat: add Qdrant ingest destination connector (#2338)

This PR intends to add [Qdrant](https://qdrant.tech/) as a supported
ingestion destination.

- Implements CLI and programmatic usage.
- Documentation update
- Integration test script

---
Clone of #2315 to run with CI secrets

---------

Co-authored-by: Anush008 <anushshetty90@gmail.com>
Co-authored-by: Roman Isecke <136338424+rbiseck3@users.noreply.github.com>
This commit is contained in:
ryannikolaidis 2024-01-02 14:08:20 -08:00 committed by GitHub
parent 9459af435d
commit dd1443ab6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 603 additions and 9 deletions

View File

@ -1,4 +1,4 @@
## 0.11.7-dev3 ## 0.11.7-dev4
### Enhancements ### Enhancements
@ -7,6 +7,7 @@
### Features ### Features
* **Add Qdrant destination connector.** Adds support for writing documents and embeddings into a Qdrant collection.
* **Store base64 encoded image data in metadata fields.** Rather than saving to file, stores base64 encoded data of the image bytes and the mimetype for the image in metadata fields: `image_base64` and `image_mime_type` (if that is what the user specifies by some other param like `pdf_extract_to_payload`). This would allow the API to have parity with the library. * **Store base64 encoded image data in metadata fields.** Rather than saving to file, stores base64 encoded data of the image bytes and the mimetype for the image in metadata fields: `image_base64` and `image_mime_type` (if that is what the user specifies by some other param like `pdf_extract_to_payload`). This would allow the API to have parity with the library.
### Fixes ### Fixes

View File

@ -223,6 +223,10 @@ install-ingest-sftp:
install-ingest-pinecone: install-ingest-pinecone:
python3 -m pip install -r requirements/ingest/pinecone.txt python3 -m pip install -r requirements/ingest/pinecone.txt
.PHONY: install-ingest-qdrant
install-ingest-qdrant:
python3 -m pip install -r requirements/ingest/qdrant.txt
.PHONY: install-ingest-chroma .PHONY: install-ingest-chroma
install-ingest-chroma: install-ingest-chroma:
python3 -m pip install -r requirements/ingest/chroma.txt python3 -m pip install -r requirements/ingest/chroma.txt

View File

@ -17,6 +17,7 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/gcs destination_connectors/gcs
destination_connectors/mongodb destination_connectors/mongodb
destination_connectors/pinecone destination_connectors/pinecone
destination_connectors/qdrant
destination_connectors/s3 destination_connectors/s3
destination_connectors/weaviate destination_connectors/weaviate

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"}
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-output-to-qdrant \
--strategy fast \
--chunk-elements \
--embedding-provider "$EMBEDDING_PROVIDER" \
--num-processes 2 \
--verbose \
qdrant \
--collection-name "test" \
--location "http://localhost:6333" \
--batch-size 80

View File

@ -0,0 +1,46 @@
from unstructured.ingest.connector.local import SimpleLocalConfig
from unstructured.ingest.connector.qdrant import (
QdrantWriteConfig,
SimpleQdrantConfig,
)
from unstructured.ingest.interfaces import (
ChunkingConfig,
EmbeddingConfig,
PartitionConfig,
ProcessorConfig,
ReadConfig,
)
from unstructured.ingest.runner import LocalRunner
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.qdrant import QdrantWriter
def get_writer() -> Writer:
return QdrantWriter(
connector_config=SimpleQdrantConfig(
location="http://localhost:6333",
collection_name="test",
),
write_config=QdrantWriteConfig(batch_size=80),
)
if __name__ == "__main__":
writer = get_writer()
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-qdrant",
num_processes=2,
),
connector_config=SimpleLocalConfig(
input_path="example-docs/book-war-and-peace-1225p.txt",
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(chunk_elements=True),
embedding_config=EmbeddingConfig(provider="langchain-huggingface"),
writer=writer,
writer_kwargs={},
)
runner.run()

View File

@ -0,0 +1,34 @@
Qdrant
===========
Batch process all your records using ``unstructured-ingest`` to store structured outputs and embeddings locally on your filesystem and upload those to a Qdrant collection.
First you'll need to install the Qdrant dependencies as shown here.
.. code:: shell
pip install "unstructured[qdrant]"
Create a Qdrant collection with the appropriate configurations. Find more information in the `Qdrant collections guide <https://qdrant.tech/documentation/concepts/collections/>`_.
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
.. tabs::
.. tab:: Shell
.. literalinclude:: ./code/bash/qdrant.sh
:language: bash
.. tab:: Python
.. literalinclude:: ./code/python/qdrant.py
:language: python
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> qdrant --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -69,7 +69,7 @@ A RAG workflow can be broken down into the following steps:
4. **Embedding**: After chunking, you will need to convert the text into a numerical representation (vector embedding) that a LLM can understand. OpenAI, Cohere, and Hugging Face all offer embedding models. 4. **Embedding**: After chunking, you will need to convert the text into a numerical representation (vector embedding) that a LLM can understand. OpenAI, Cohere, and Hugging Face all offer embedding models.
5. **Vector Database**: The next step is to choose a location for storing your chunked embeddings. There are lots of options to choose from for your vector database (Pinecone, Milvus, ChromaDD, Weaviate and more). 5. **Vector Database**: The next step is to choose a location for storing your chunked embeddings. There are lots of options to choose from for your vector database (ChromaDB, Milvus, Pinecone, Qdrant, Weaviate and more).
6. **User Prompt**: Take the user prompt and grab the most relevant chunks of information in the vector database via similarity search. 6. **User Prompt**: Take the user prompt and grab the most relevant chunks of information in the vector database via similarity search.

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Uploads the structured output of the files within the given path to a Qdrant collection named 'test'.
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/../../.. || exit 1
EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"}
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-output-to-qdrant \
--strategy fast \
--chunk-elements \
--embedding-provider "$EMBEDDING_PROVIDER" \
--num-processes 2 \
--verbose \
qdrant \
--collection-name "test" \
--location "http://localhost:6333" \
--batch-size 80

View File

@ -0,0 +1,3 @@
-c ../constraints.in
-c ../base.txt
qdrant-client

View File

@ -0,0 +1,74 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=ingest/qdrant.txt ingest/qdrant.in
#
anyio==3.7.1
# via
# -c ingest/../constraints.in
# httpx
certifi==2023.11.17
# via
# -c ingest/../base.txt
# -c ingest/../constraints.in
# httpcore
# httpx
exceptiongroup==1.2.0
# via anyio
grpcio==1.60.0
# via
# grpcio-tools
# qdrant-client
grpcio-tools==1.60.0
# via qdrant-client
h11==0.14.0
# via httpcore
h2==4.1.0
# via httpx
hpack==4.0.0
# via h2
httpcore==1.0.2
# via httpx
httpx[http2]==0.26.0
# via qdrant-client
hyperframe==6.0.1
# via h2
idna==3.6
# via
# -c ingest/../base.txt
# anyio
# httpx
numpy==1.24.4
# via
# -c ingest/../base.txt
# -c ingest/../constraints.in
# qdrant-client
portalocker==2.8.2
# via qdrant-client
protobuf==4.23.4
# via
# -c ingest/../constraints.in
# grpcio-tools
pydantic==1.10.13
# via
# -c ingest/../constraints.in
# qdrant-client
qdrant-client==1.7.0
# via -r ingest/qdrant.in
sniffio==1.3.0
# via
# anyio
# httpx
typing-extensions==4.8.0
# via
# -c ingest/../base.txt
# pydantic
urllib3==1.26.18
# via
# -c ingest/../base.txt
# -c ingest/../constraints.in
# qdrant-client
# The following packages are considered to be unsafe in a requirements file:
# setuptools

View File

@ -150,6 +150,7 @@ setup(
"onedrive": load_requirements("requirements/ingest/onedrive.in"), "onedrive": load_requirements("requirements/ingest/onedrive.in"),
"outlook": load_requirements("requirements/ingest/outlook.in"), "outlook": load_requirements("requirements/ingest/outlook.in"),
"pinecone": load_requirements("requirements/ingest/pinecone.in"), "pinecone": load_requirements("requirements/ingest/pinecone.in"),
"qdrant": load_requirements("requirements/ingest/qdrant.in"),
"reddit": load_requirements("requirements/ingest/reddit.in"), "reddit": load_requirements("requirements/ingest/reddit.in"),
"s3": load_requirements("requirements/ingest/s3.in"), "s3": load_requirements("requirements/ingest/s3.in"),
"sharepoint": load_requirements("requirements/ingest/sharepoint.in"), "sharepoint": load_requirements("requirements/ingest/sharepoint.in"),

View File

@ -0,0 +1,87 @@
#!/bin/bash
set -ex
DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=qdrant-dest
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
writer_processes=$(((max_processes - 1) > 1 ? (max_processes - 1) : 2))
CONTAINTER_NAME="qdrant_test"
QDRANT_PORT=6333
QDRANT_HOST=localhost:$QDRANT_PORT
COLLECTION_NAME="qdrant-test-$(date +%s)"
EXPECTED_POINTS_COUNT=1404
RETRIES=5
function stop_docker() {
docker stop $CONTAINTER_NAME
}
docker run -d --rm \
-p 6333:$QDRANT_PORT \
--name $CONTAINTER_NAME qdrant/qdrant:latest
trap stop_docker SIGINT
trap stop_docker ERR
until curl --output /dev/null --silent --get --fail http://$QDRANT_HOST/collections; do
RETRIES=$((RETRIES - 1))
if [ "$RETRIES" -le 0 ]; then
echo "Qdrant server failed to start"
stop_docker
exit 1
fi
printf 'Waiting for Qdrant server to start...'
sleep 5
done
curl -X PUT \
http://$QDRANT_HOST/collections/"$COLLECTION_NAME" \
-H 'Content-Type: application/json' \
-d '{
"vectors": {
"size": 384,
"distance": "Cosine"
}
}'
EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"}
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir "$WORK_DIR" \
--chunk-elements \
--chunk-combine-text-under-n-chars 200 --chunk-new-after-n-chars 2500 --chunk-max-characters 38000 --chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
qdrant \
--collection-name "$COLLECTION_NAME" \
--location "http://"$QDRANT_HOST \
--batch-size 80 \
--num-processes "$writer_processes"
response=$(curl -s -X POST \
$QDRANT_HOST/collections/"$COLLECTION_NAME"/points/count \
-H 'Content-Type: application/json' \
-d '{
"exact": true
}')
count=$(echo "$response" | jq -r '.result.count')
if [ "$count" -ne $EXPECTED_POINTS_COUNT ]; then
echo "Points count assertion failed. Expected: $EXPECTED. Got: $count. Test failed."
stop_docker
exit 1
fi
stop_docker

View File

@ -25,6 +25,7 @@ all_tests=(
'gcs.sh' 'gcs.sh'
'mongodb.sh' 'mongodb.sh'
'pinecone.sh' 'pinecone.sh'
'qdrant.sh'
's3.sh' 's3.sh'
'weaviate.sh' 'weaviate.sh'
'sharepoint-embed-cog-index.sh' 'sharepoint-embed-cog-index.sh'

View File

@ -1 +1 @@
__version__ = "0.11.7-dev3" # pragma: no cover __version__ = "0.11.7-dev4" # pragma: no cover

View File

@ -39,6 +39,7 @@ from .notion import get_base_src_cmd as notion_base_src_cmd
from .onedrive import get_base_src_cmd as onedrive_base_src_cmd from .onedrive import get_base_src_cmd as onedrive_base_src_cmd
from .outlook import get_base_src_cmd as outlook_base_src_cmd from .outlook import get_base_src_cmd as outlook_base_src_cmd
from .pinecone import get_base_dest_cmd as pinecone_base_dest_cmd from .pinecone import get_base_dest_cmd as pinecone_base_dest_cmd
from .qdrant import get_base_dest_cmd as qdrant_base_dest_cmd
from .reddit import get_base_src_cmd as reddit_base_src_cmd from .reddit import get_base_src_cmd as reddit_base_src_cmd
from .salesforce import get_base_src_cmd as salesforce_base_src_cmd from .salesforce import get_base_src_cmd as salesforce_base_src_cmd
from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd
@ -103,6 +104,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
weaviate_dest_cmd, weaviate_dest_cmd,
mongo_base_dest_cmd, mongo_base_dest_cmd,
pinecone_base_dest_cmd, pinecone_base_dest_cmd,
qdrant_base_dest_cmd,
] ]
# Make sure there are not overlapping names # Make sure there are not overlapping names

View File

@ -0,0 +1,124 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import (
CliConfig,
)
from unstructured.ingest.connector.qdrant import QdrantWriteConfig, SimpleQdrantConfig
@dataclass
class QdrantCliConfig(SimpleQdrantConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--collection-name"],
required=True,
type=str,
help="The name of the Qdrant collection to use.",
),
click.Option(
["--location"],
type=str,
help="The location of the Qdrant cluster.",
),
click.Option(
["--url"],
type=str,
help="The location of the Qdrant cluster.",
),
click.Option(
["--port"],
type=int,
default=6333,
help="Port of the REST API interface. Default: 6333.",
),
click.Option(
["--grpc-port"],
type=int,
default=6334,
help="Port of the gRPC interface. Default: 6334.",
),
click.Option(
["--prefer-grpc"],
type=bool,
is_flag=True,
help="Whether to use gPRC interface whenever possible in methods. Default: False.",
),
click.Option(
["--https"],
type=bool,
is_flag=True,
help="Whether to use HTTPS(SSL) protocol. Default: False.",
),
click.Option(
["--prefix"],
type=str,
help="Prefix to add the REST API endpoints.",
),
click.Option(
["--timeout"],
type=int,
help="Timeout for operations. Default: 5.0 seconds for REST, unlimited for gRPC.",
),
click.Option(
["--host"],
type=str,
help="Host name of the Qdrant service.",
),
click.Option(
["--path"],
type=str,
help="Persistence path for QdrantLocal.",
),
click.Option(
["--force-disable-check-same-thread"],
type=bool,
is_flag=True,
help="Whether to force disable check same thread for QdrantLocal.",
),
click.Option(
["--api-key"],
type=str,
help="API key for authentication in Qdrant Cloud. Default: None.",
envvar="QDRANT_API_KEY",
show_envvar=True,
),
]
return options
@dataclass
class QdrantCliWriteConfig(QdrantWriteConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--batch-size"],
default=50,
type=int,
help="Number of points to upload per batch",
),
click.Option(
["--num-processes"],
default=2,
type=int,
help="Number of parallel processes with which to upload",
),
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name="qdrant",
cli_config=QdrantCliConfig,
additional_cli_options=[QdrantCliWriteConfig],
write_config=QdrantWriteConfig,
)
return cmd_cls

View File

@ -0,0 +1,156 @@
import json
import multiprocessing as mp
import typing as t
import uuid
from dataclasses import dataclass
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import DestinationConnectionError, WriteError
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
ConfigSessionHandleMixin,
IngestDocSessionHandleMixin,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies
if t.TYPE_CHECKING:
from qdrant_client import QdrantClient
@dataclass
class QdrantAccessConfig(AccessConfig):
api_key: t.Optional[str] = enhanced_field(sensitive=True)
@dataclass
class SimpleQdrantConfig(ConfigSessionHandleMixin, BaseConnectorConfig):
collection_name: str
location: t.Optional[str] = None
url: t.Optional[str] = None
port: t.Optional[int] = 6333
grpc_port: t.Optional[int] = 6334
prefer_grpc: t.Optional[bool] = False
https: t.Optional[bool] = None
prefix: t.Optional[str] = None
timeout: t.Optional[float] = None
host: t.Optional[str] = None
path: t.Optional[str] = None
force_disable_check_same_thread: t.Optional[bool] = False
access_config: t.Optional[QdrantAccessConfig] = None
@dataclass
class QdrantWriteConfig(WriteConfig):
batch_size: int = 50
num_processes: int = 1
@dataclass
class QdrantDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConnector):
write_config: QdrantWriteConfig
connector_config: SimpleQdrantConfig
_client: t.Optional["QdrantClient"] = None
@property
def qdrant_client(self):
if self._client is None:
self._client = self.create_client()
return self._client
def initialize(self):
...
@requires_dependencies(["qdrant_client"], extras="qdrant")
def create_client(self) -> "QdrantClient":
from qdrant_client import QdrantClient
client = QdrantClient(
location=self.connector_config.location,
url=self.connector_config.url,
port=self.connector_config.port,
grpc_port=self.connector_config.grpc_port,
prefer_grpc=self.connector_config.prefer_grpc,
https=self.connector_config.https,
api_key=self.connector_config.access_config.api_key
if self.connector_config.access_config
else None,
prefix=self.connector_config.prefix,
timeout=self.connector_config.timeout,
host=self.connector_config.host,
path=self.connector_config.path,
force_disable_check_same_thread=self.connector_config.force_disable_check_same_thread,
)
return client
@DestinationConnectionError.wrap
def check_connection(self):
self.qdrant_client.get_collections()
@DestinationConnectionError.wrap
@requires_dependencies(["qdrant_client"], extras="qdrant")
def upsert_batch(self, batch: t.List[t.Dict[str, t.Any]]):
from qdrant_client import models
client = self.qdrant_client
try:
points: list[models.PointStruct] = [models.PointStruct(**item) for item in batch]
response = client.upsert(
self.connector_config.collection_name, points=points, wait=True
)
except Exception as api_error:
raise WriteError(f"Qdrant error: {api_error}") from api_error
logger.debug(f"results: {response}")
def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"Upserting {len(dict_list)} elements to " f"{self.connector_config.collection_name}",
)
qdrant_batch_size = self.write_config.batch_size
logger.info(f"using {self.write_config.num_processes} processes to upload")
if self.write_config.num_processes == 1:
for chunk in chunk_generator(dict_list, qdrant_batch_size):
self.upsert_batch(chunk)
else:
with mp.Pool(
processes=self.write_config.num_processes,
) as pool:
pool.map(self.upsert_batch, list(chunk_generator(dict_list, qdrant_batch_size)))
def write(self, docs: t.List[BaseIngestDoc]) -> None:
dict_list: t.List[t.Dict[str, t.Any]] = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
dict_content = json.load(json_file)
dict_content = [
{
"id": str(uuid.uuid4()),
"vector": element.pop("embeddings", {}),
"payload": {
"text": element.pop("text", None),
"element_serialized": json.dumps(element),
**flatten_dict(
element,
separator="-",
flatten_lists=True,
),
},
}
for element in dict_content
]
logger.info(
f"appending {len(dict_content)} json elements from content in {local_path}",
)
dict_list.extend(dict_content)
self.write_dict(dict_list=dict_list)

View File

@ -12,6 +12,7 @@ from .fsspec.gcs import GcsWriter
from .fsspec.s3 import S3Writer from .fsspec.s3 import S3Writer
from .mongodb import MongodbWriter from .mongodb import MongodbWriter
from .pinecone import PineconeWriter from .pinecone import PineconeWriter
from .qdrant import QdrantWriter
from .weaviate import WeaviateWriter from .weaviate import WeaviateWriter
writer_map: t.Dict[str, t.Type[Writer]] = { writer_map: t.Dict[str, t.Type[Writer]] = {
@ -25,6 +26,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = {
"gcs": GcsWriter, "gcs": GcsWriter,
"mongodb": MongodbWriter, "mongodb": MongodbWriter,
"pinecone": PineconeWriter, "pinecone": PineconeWriter,
"qdrant": QdrantWriter,
"s3": S3Writer, "s3": S3Writer,
"weaviate": WeaviateWriter, "weaviate": WeaviateWriter,
} }

View File

@ -0,0 +1,19 @@
import typing as t
from dataclasses import dataclass
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.ingest.runner.writers.base_writer import Writer
if t.TYPE_CHECKING:
from unstructured.ingest.connector.qdrant import QdrantWriteConfig, SimpleQdrantConfig
@dataclass
class QdrantWriter(Writer):
write_config: "QdrantWriteConfig"
connector_config: "SimpleQdrantConfig"
def get_connector_cls(self) -> t.Type[BaseDestinationConnector]:
from unstructured.ingest.connector.qdrant import QdrantDestinationConnector
return QdrantDestinationConnector