diff --git a/CHANGELOG.md b/CHANGELOG.md index eba2afc82..6a7f2539d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.11.7-dev3 +## 0.11.7-dev4 ### Enhancements @@ -6,7 +6,8 @@ * **Update encoders to leverage dataclasses** All encoders now follow a class approach which get annotated with the dataclass decorator. Similar to the connectors, it uses a nested dataclass for the configs required to configure a client as well as a field/property approach to cache the client. This makes sure any variable associated with the class exists as a dataclass field. ### Features - + +* **Add Qdrant destination connector.** Adds support for writing documents and embeddings into a Qdrant collection. * **Store base64 encoded image data in metadata fields.** Rather than saving to file, stores base64 encoded data of the image bytes and the mimetype for the image in metadata fields: `image_base64` and `image_mime_type` (if that is what the user specifies by some other param like `pdf_extract_to_payload`). This would allow the API to have parity with the library. ### Fixes diff --git a/Makefile b/Makefile index a0b8b297b..9847f0f1e 100644 --- a/Makefile +++ b/Makefile @@ -223,6 +223,10 @@ install-ingest-sftp: install-ingest-pinecone: python3 -m pip install -r requirements/ingest/pinecone.txt +.PHONY: install-ingest-qdrant +install-ingest-qdrant: + python3 -m pip install -r requirements/ingest/qdrant.txt + .PHONY: install-ingest-chroma install-ingest-chroma: python3 -m pip install -r requirements/ingest/chroma.txt diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index 8582d8e21..98cfce29c 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -17,6 +17,7 @@ in our community `Slack. `_ destination_connectors/gcs destination_connectors/mongodb destination_connectors/pinecone + destination_connectors/qdrant destination_connectors/s3 destination_connectors/weaviate diff --git a/docs/source/ingest/destination_connectors/code/bash/qdrant.sh b/docs/source/ingest/destination_connectors/code/bash/qdrant.sh new file mode 100644 index 000000000..f2af6477d --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/bash/qdrant.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"} + +unstructured-ingest \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-output-to-qdrant \ + --strategy fast \ + --chunk-elements \ + --embedding-provider "$EMBEDDING_PROVIDER" \ + --num-processes 2 \ + --verbose \ + qdrant \ + --collection-name "test" \ + --location "http://localhost:6333" \ + --batch-size 80 diff --git a/docs/source/ingest/destination_connectors/code/python/qdrant.py b/docs/source/ingest/destination_connectors/code/python/qdrant.py new file mode 100644 index 000000000..35826e991 --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/python/qdrant.py @@ -0,0 +1,46 @@ +from unstructured.ingest.connector.local import SimpleLocalConfig +from unstructured.ingest.connector.qdrant import ( + QdrantWriteConfig, + SimpleQdrantConfig, +) +from unstructured.ingest.interfaces import ( + ChunkingConfig, + EmbeddingConfig, + PartitionConfig, + ProcessorConfig, + ReadConfig, +) +from unstructured.ingest.runner import LocalRunner +from unstructured.ingest.runner.writers.base_writer import Writer +from unstructured.ingest.runner.writers.qdrant import QdrantWriter + + +def get_writer() -> Writer: + return QdrantWriter( + connector_config=SimpleQdrantConfig( + location="http://localhost:6333", + collection_name="test", + ), + write_config=QdrantWriteConfig(batch_size=80), + ) + + +if __name__ == "__main__": + writer = get_writer() + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-qdrant", + num_processes=2, + ), + connector_config=SimpleLocalConfig( + input_path="example-docs/book-war-and-peace-1225p.txt", + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + chunking_config=ChunkingConfig(chunk_elements=True), + embedding_config=EmbeddingConfig(provider="langchain-huggingface"), + writer=writer, + writer_kwargs={}, + ) + runner.run() diff --git a/docs/source/ingest/destination_connectors/qdrant.rst b/docs/source/ingest/destination_connectors/qdrant.rst new file mode 100644 index 000000000..4d9f9ff02 --- /dev/null +++ b/docs/source/ingest/destination_connectors/qdrant.rst @@ -0,0 +1,34 @@ +Qdrant +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs and embeddings locally on your filesystem and upload those to a Qdrant collection. + +First you'll need to install the Qdrant dependencies as shown here. + +.. code:: shell + + pip install "unstructured[qdrant]" + +Create a Qdrant collection with the appropriate configurations. Find more information in the `Qdrant collections guide `_. + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. + +.. tabs:: + + .. tab:: Shell + + .. literalinclude:: ./code/bash/qdrant.sh + :language: bash + + .. tab:: Python + + .. literalinclude:: ./code/python/qdrant.py + :language: python + + +For a full list of the options the CLI accepts check ``unstructured-ingest qdrant --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/docs/source/introduction/key_concepts.rst b/docs/source/introduction/key_concepts.rst index 18ed9329d..96ebddeca 100644 --- a/docs/source/introduction/key_concepts.rst +++ b/docs/source/introduction/key_concepts.rst @@ -14,7 +14,7 @@ Data Preprocessing Before the core analysis, raw data often requires significant preprocessing: - **Partitioning**: Segregating data into smaller, manageable segments or partitions. - + - **Cleaning**: Removing anomalies, filling missing values, and eliminating any irrelevant or erroneous information. Preprocessing ensures data integrity and can significantly influence the outcomes of subsequent tasks. @@ -49,14 +49,14 @@ LLMs, like GPT, are trained on vast amounts of data and have the capacity to com Retrieval Augmented Generation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Large Language Models (LLMs) like OpenAI's ChatGPT and Anthropic's Claude have revolutionized the AI landscape with their prowess. However, they inherently suffer from significant drawbacks. One major issue is their static nature, which means they're "frozen in time". -For instance, ChatGPT's knowledge is limited up to September 2021, leaving it blind to any developments or information post that period. Despite this, LLMs might often respond to newer queries with unwarranted confidence, a phenomenon known as "hallucination". +Large Language Models (LLMs) like OpenAI's ChatGPT and Anthropic's Claude have revolutionized the AI landscape with their prowess. However, they inherently suffer from significant drawbacks. One major issue is their static nature, which means they're "frozen in time". +For instance, ChatGPT's knowledge is limited up to September 2021, leaving it blind to any developments or information post that period. Despite this, LLMs might often respond to newer queries with unwarranted confidence, a phenomenon known as "hallucination". Such errors can be highly detrimental, especially when these models serve critical real-world applications. -Retrieval Augmented Generation (RAG) is a groundbreaking technique designed to counteract the limitations of foundational LLMs. By pairing an LLM with a RAG pipeline, we can enable users to access the underlying data sources that the model uses. This transparent approach not +Retrieval Augmented Generation (RAG) is a groundbreaking technique designed to counteract the limitations of foundational LLMs. By pairing an LLM with a RAG pipeline, we can enable users to access the underlying data sources that the model uses. This transparent approach not only ensures that an LLM's claims can be verified for accuracy but also builds a trust factor among users. -Moreover, RAG offers a cost-effective solution. Instead of bearing the extensive computational and financial burdens of training custom models or finetuning existing ones, RAG can, in many situations, serve as a sufficient alternative. This reduction in resource consumption +Moreover, RAG offers a cost-effective solution. Instead of bearing the extensive computational and financial burdens of training custom models or finetuning existing ones, RAG can, in many situations, serve as a sufficient alternative. This reduction in resource consumption is particularly beneficial for organizations that lack the means to develop and deploy foundational models from scratch. A RAG workflow can be broken down into the following steps: @@ -69,7 +69,7 @@ A RAG workflow can be broken down into the following steps: 4. **Embedding**: After chunking, you will need to convert the text into a numerical representation (vector embedding) that a LLM can understand. OpenAI, Cohere, and Hugging Face all offer embedding models. -5. **Vector Database**: The next step is to choose a location for storing your chunked embeddings. There are lots of options to choose from for your vector database (Pinecone, Milvus, ChromaDD, Weaviate and more). +5. **Vector Database**: The next step is to choose a location for storing your chunked embeddings. There are lots of options to choose from for your vector database (ChromaDB, Milvus, Pinecone, Qdrant, Weaviate and more). 6. **User Prompt**: Take the user prompt and grab the most relevant chunks of information in the vector database via similarity search. diff --git a/examples/ingest/qdrant/ingest.sh b/examples/ingest/qdrant/ingest.sh new file mode 100644 index 000000000..b2595a943 --- /dev/null +++ b/examples/ingest/qdrant/ingest.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Uploads the structured output of the files within the given path to a Qdrant collection named 'test'. + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) +cd "$SCRIPT_DIR"/../../.. || exit 1 + +EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"} + +unstructured-ingest \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-output-to-qdrant \ + --strategy fast \ + --chunk-elements \ + --embedding-provider "$EMBEDDING_PROVIDER" \ + --num-processes 2 \ + --verbose \ + qdrant \ + --collection-name "test" \ + --location "http://localhost:6333" \ + --batch-size 80 diff --git a/requirements/ingest/qdrant.in b/requirements/ingest/qdrant.in new file mode 100644 index 000000000..20e1dd231 --- /dev/null +++ b/requirements/ingest/qdrant.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +qdrant-client \ No newline at end of file diff --git a/requirements/ingest/qdrant.txt b/requirements/ingest/qdrant.txt new file mode 100644 index 000000000..ce8be74cf --- /dev/null +++ b/requirements/ingest/qdrant.txt @@ -0,0 +1,74 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=ingest/qdrant.txt ingest/qdrant.in +# +anyio==3.7.1 + # via + # -c ingest/../constraints.in + # httpx +certifi==2023.11.17 + # via + # -c ingest/../base.txt + # -c ingest/../constraints.in + # httpcore + # httpx +exceptiongroup==1.2.0 + # via anyio +grpcio==1.60.0 + # via + # grpcio-tools + # qdrant-client +grpcio-tools==1.60.0 + # via qdrant-client +h11==0.14.0 + # via httpcore +h2==4.1.0 + # via httpx +hpack==4.0.0 + # via h2 +httpcore==1.0.2 + # via httpx +httpx[http2]==0.26.0 + # via qdrant-client +hyperframe==6.0.1 + # via h2 +idna==3.6 + # via + # -c ingest/../base.txt + # anyio + # httpx +numpy==1.24.4 + # via + # -c ingest/../base.txt + # -c ingest/../constraints.in + # qdrant-client +portalocker==2.8.2 + # via qdrant-client +protobuf==4.23.4 + # via + # -c ingest/../constraints.in + # grpcio-tools +pydantic==1.10.13 + # via + # -c ingest/../constraints.in + # qdrant-client +qdrant-client==1.7.0 + # via -r ingest/qdrant.in +sniffio==1.3.0 + # via + # anyio + # httpx +typing-extensions==4.8.0 + # via + # -c ingest/../base.txt + # pydantic +urllib3==1.26.18 + # via + # -c ingest/../base.txt + # -c ingest/../constraints.in + # qdrant-client + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/setup.py b/setup.py index e5c14a675..8b03b27de 100644 --- a/setup.py +++ b/setup.py @@ -150,6 +150,7 @@ setup( "onedrive": load_requirements("requirements/ingest/onedrive.in"), "outlook": load_requirements("requirements/ingest/outlook.in"), "pinecone": load_requirements("requirements/ingest/pinecone.in"), + "qdrant": load_requirements("requirements/ingest/qdrant.in"), "reddit": load_requirements("requirements/ingest/reddit.in"), "s3": load_requirements("requirements/ingest/s3.in"), "sharepoint": load_requirements("requirements/ingest/sharepoint.in"), diff --git a/test_unstructured_ingest/dest/qdrant.sh b/test_unstructured_ingest/dest/qdrant.sh new file mode 100755 index 000000000..2e884d37e --- /dev/null +++ b/test_unstructured_ingest/dest/qdrant.sh @@ -0,0 +1,87 @@ +#!/bin/bash + +set -ex + +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=qdrant-dest +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} +writer_processes=$(((max_processes - 1) > 1 ? (max_processes - 1) : 2)) +CONTAINTER_NAME="qdrant_test" +QDRANT_PORT=6333 +QDRANT_HOST=localhost:$QDRANT_PORT +COLLECTION_NAME="qdrant-test-$(date +%s)" +EXPECTED_POINTS_COUNT=1404 +RETRIES=5 + +function stop_docker() { + docker stop $CONTAINTER_NAME +} + +docker run -d --rm \ + -p 6333:$QDRANT_PORT \ + --name $CONTAINTER_NAME qdrant/qdrant:latest + +trap stop_docker SIGINT +trap stop_docker ERR + +until curl --output /dev/null --silent --get --fail http://$QDRANT_HOST/collections; do + RETRIES=$((RETRIES - 1)) + if [ "$RETRIES" -le 0 ]; then + echo "Qdrant server failed to start" + stop_docker + exit 1 + fi + printf 'Waiting for Qdrant server to start...' + sleep 5 +done + +curl -X PUT \ + http://$QDRANT_HOST/collections/"$COLLECTION_NAME" \ + -H 'Content-Type: application/json' \ + -d '{ + "vectors": { + "size": 384, + "distance": "Cosine" + } +}' + +EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER:-"langchain-huggingface"} + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --work-dir "$WORK_DIR" \ + --chunk-elements \ + --chunk-combine-text-under-n-chars 200 --chunk-new-after-n-chars 2500 --chunk-max-characters 38000 --chunk-multipage-sections \ + --embedding-provider "langchain-huggingface" \ + qdrant \ + --collection-name "$COLLECTION_NAME" \ + --location "http://"$QDRANT_HOST \ + --batch-size 80 \ + --num-processes "$writer_processes" + +response=$(curl -s -X POST \ + $QDRANT_HOST/collections/"$COLLECTION_NAME"/points/count \ + -H 'Content-Type: application/json' \ + -d '{ + "exact": true +}') + +count=$(echo "$response" | jq -r '.result.count') + +if [ "$count" -ne $EXPECTED_POINTS_COUNT ]; then + echo "Points count assertion failed. Expected: $EXPECTED. Got: $count. Test failed." + stop_docker + exit 1 +fi + +stop_docker diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index 20bd482ab..748d44dd8 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -25,6 +25,7 @@ all_tests=( 'gcs.sh' 'mongodb.sh' 'pinecone.sh' + 'qdrant.sh' 's3.sh' 'weaviate.sh' 'sharepoint-embed-cog-index.sh' diff --git a/unstructured/__version__.py b/unstructured/__version__.py index fdd608fe8..b5f316506 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.11.7-dev3" # pragma: no cover +__version__ = "0.11.7-dev4" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index b416ea9da..3b076d56d 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -39,6 +39,7 @@ from .notion import get_base_src_cmd as notion_base_src_cmd from .onedrive import get_base_src_cmd as onedrive_base_src_cmd from .outlook import get_base_src_cmd as outlook_base_src_cmd from .pinecone import get_base_dest_cmd as pinecone_base_dest_cmd +from .qdrant import get_base_dest_cmd as qdrant_base_dest_cmd from .reddit import get_base_src_cmd as reddit_base_src_cmd from .salesforce import get_base_src_cmd as salesforce_base_src_cmd from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd @@ -103,6 +104,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [ weaviate_dest_cmd, mongo_base_dest_cmd, pinecone_base_dest_cmd, + qdrant_base_dest_cmd, ] # Make sure there are not overlapping names diff --git a/unstructured/ingest/cli/cmds/qdrant.py b/unstructured/ingest/cli/cmds/qdrant.py new file mode 100644 index 000000000..1a0847614 --- /dev/null +++ b/unstructured/ingest/cli/cmds/qdrant.py @@ -0,0 +1,124 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import ( + CliConfig, +) +from unstructured.ingest.connector.qdrant import QdrantWriteConfig, SimpleQdrantConfig + + +@dataclass +class QdrantCliConfig(SimpleQdrantConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--collection-name"], + required=True, + type=str, + help="The name of the Qdrant collection to use.", + ), + click.Option( + ["--location"], + type=str, + help="The location of the Qdrant cluster.", + ), + click.Option( + ["--url"], + type=str, + help="The location of the Qdrant cluster.", + ), + click.Option( + ["--port"], + type=int, + default=6333, + help="Port of the REST API interface. Default: 6333.", + ), + click.Option( + ["--grpc-port"], + type=int, + default=6334, + help="Port of the gRPC interface. Default: 6334.", + ), + click.Option( + ["--prefer-grpc"], + type=bool, + is_flag=True, + help="Whether to use gPRC interface whenever possible in methods. Default: False.", + ), + click.Option( + ["--https"], + type=bool, + is_flag=True, + help="Whether to use HTTPS(SSL) protocol. Default: False.", + ), + click.Option( + ["--prefix"], + type=str, + help="Prefix to add the REST API endpoints.", + ), + click.Option( + ["--timeout"], + type=int, + help="Timeout for operations. Default: 5.0 seconds for REST, unlimited for gRPC.", + ), + click.Option( + ["--host"], + type=str, + help="Host name of the Qdrant service.", + ), + click.Option( + ["--path"], + type=str, + help="Persistence path for QdrantLocal.", + ), + click.Option( + ["--force-disable-check-same-thread"], + type=bool, + is_flag=True, + help="Whether to force disable check same thread for QdrantLocal.", + ), + click.Option( + ["--api-key"], + type=str, + help="API key for authentication in Qdrant Cloud. Default: None.", + envvar="QDRANT_API_KEY", + show_envvar=True, + ), + ] + return options + + +@dataclass +class QdrantCliWriteConfig(QdrantWriteConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--batch-size"], + default=50, + type=int, + help="Number of points to upload per batch", + ), + click.Option( + ["--num-processes"], + default=2, + type=int, + help="Number of parallel processes with which to upload", + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name="qdrant", + cli_config=QdrantCliConfig, + additional_cli_options=[QdrantCliWriteConfig], + write_config=QdrantWriteConfig, + ) + return cmd_cls diff --git a/unstructured/ingest/connector/qdrant.py b/unstructured/ingest/connector/qdrant.py new file mode 100644 index 000000000..2bf21b24e --- /dev/null +++ b/unstructured/ingest/connector/qdrant.py @@ -0,0 +1,156 @@ +import json +import multiprocessing as mp +import typing as t +import uuid +from dataclasses import dataclass + +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.error import DestinationConnectionError, WriteError +from unstructured.ingest.interfaces import ( + AccessConfig, + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + ConfigSessionHandleMixin, + IngestDocSessionHandleMixin, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.staging.base import flatten_dict +from unstructured.utils import requires_dependencies + +if t.TYPE_CHECKING: + from qdrant_client import QdrantClient + + +@dataclass +class QdrantAccessConfig(AccessConfig): + api_key: t.Optional[str] = enhanced_field(sensitive=True) + + +@dataclass +class SimpleQdrantConfig(ConfigSessionHandleMixin, BaseConnectorConfig): + collection_name: str + location: t.Optional[str] = None + url: t.Optional[str] = None + port: t.Optional[int] = 6333 + grpc_port: t.Optional[int] = 6334 + prefer_grpc: t.Optional[bool] = False + https: t.Optional[bool] = None + prefix: t.Optional[str] = None + timeout: t.Optional[float] = None + host: t.Optional[str] = None + path: t.Optional[str] = None + force_disable_check_same_thread: t.Optional[bool] = False + access_config: t.Optional[QdrantAccessConfig] = None + + +@dataclass +class QdrantWriteConfig(WriteConfig): + batch_size: int = 50 + num_processes: int = 1 + + +@dataclass +class QdrantDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConnector): + write_config: QdrantWriteConfig + connector_config: SimpleQdrantConfig + _client: t.Optional["QdrantClient"] = None + + @property + def qdrant_client(self): + if self._client is None: + self._client = self.create_client() + return self._client + + def initialize(self): + ... + + @requires_dependencies(["qdrant_client"], extras="qdrant") + def create_client(self) -> "QdrantClient": + from qdrant_client import QdrantClient + + client = QdrantClient( + location=self.connector_config.location, + url=self.connector_config.url, + port=self.connector_config.port, + grpc_port=self.connector_config.grpc_port, + prefer_grpc=self.connector_config.prefer_grpc, + https=self.connector_config.https, + api_key=self.connector_config.access_config.api_key + if self.connector_config.access_config + else None, + prefix=self.connector_config.prefix, + timeout=self.connector_config.timeout, + host=self.connector_config.host, + path=self.connector_config.path, + force_disable_check_same_thread=self.connector_config.force_disable_check_same_thread, + ) + + return client + + @DestinationConnectionError.wrap + def check_connection(self): + self.qdrant_client.get_collections() + + @DestinationConnectionError.wrap + @requires_dependencies(["qdrant_client"], extras="qdrant") + def upsert_batch(self, batch: t.List[t.Dict[str, t.Any]]): + from qdrant_client import models + + client = self.qdrant_client + try: + points: list[models.PointStruct] = [models.PointStruct(**item) for item in batch] + response = client.upsert( + self.connector_config.collection_name, points=points, wait=True + ) + except Exception as api_error: + raise WriteError(f"Qdrant error: {api_error}") from api_error + logger.debug(f"results: {response}") + + def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"Upserting {len(dict_list)} elements to " f"{self.connector_config.collection_name}", + ) + + qdrant_batch_size = self.write_config.batch_size + + logger.info(f"using {self.write_config.num_processes} processes to upload") + if self.write_config.num_processes == 1: + for chunk in chunk_generator(dict_list, qdrant_batch_size): + self.upsert_batch(chunk) + + else: + with mp.Pool( + processes=self.write_config.num_processes, + ) as pool: + pool.map(self.upsert_batch, list(chunk_generator(dict_list, qdrant_batch_size))) + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + dict_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + dict_content = json.load(json_file) + dict_content = [ + { + "id": str(uuid.uuid4()), + "vector": element.pop("embeddings", {}), + "payload": { + "text": element.pop("text", None), + "element_serialized": json.dumps(element), + **flatten_dict( + element, + separator="-", + flatten_lists=True, + ), + }, + } + for element in dict_content + ] + logger.info( + f"appending {len(dict_content)} json elements from content in {local_path}", + ) + dict_list.extend(dict_content) + self.write_dict(dict_list=dict_list) diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index a2bec8ecd..9c3b3391f 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -12,6 +12,7 @@ from .fsspec.gcs import GcsWriter from .fsspec.s3 import S3Writer from .mongodb import MongodbWriter from .pinecone import PineconeWriter +from .qdrant import QdrantWriter from .weaviate import WeaviateWriter writer_map: t.Dict[str, t.Type[Writer]] = { @@ -25,6 +26,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = { "gcs": GcsWriter, "mongodb": MongodbWriter, "pinecone": PineconeWriter, + "qdrant": QdrantWriter, "s3": S3Writer, "weaviate": WeaviateWriter, } diff --git a/unstructured/ingest/runner/writers/qdrant.py b/unstructured/ingest/runner/writers/qdrant.py new file mode 100644 index 000000000..e7e632405 --- /dev/null +++ b/unstructured/ingest/runner/writers/qdrant.py @@ -0,0 +1,19 @@ +import typing as t +from dataclasses import dataclass + +from unstructured.ingest.interfaces import BaseDestinationConnector +from unstructured.ingest.runner.writers.base_writer import Writer + +if t.TYPE_CHECKING: + from unstructured.ingest.connector.qdrant import QdrantWriteConfig, SimpleQdrantConfig + + +@dataclass +class QdrantWriter(Writer): + write_config: "QdrantWriteConfig" + connector_config: "SimpleQdrantConfig" + + def get_connector_cls(self) -> t.Type[BaseDestinationConnector]: + from unstructured.ingest.connector.qdrant import QdrantDestinationConnector + + return QdrantDestinationConnector