diff --git a/CHANGELOG.md b/CHANGELOG.md index fef9aff0a..816cb2568 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.14.10-dev6 +## 0.14.10-dev7 ### Enhancements * **Update unstructured-client dependency** Change unstructured-client dependency pin back to diff --git a/MANIFEST.in b/MANIFEST.in index 1ca97c963..356b719f3 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -53,5 +53,6 @@ include requirements/ingest/salesforce.in include requirements/ingest/sftp.in include requirements/ingest/sharepoint.in include requirements/ingest/slack.in +include requirements/ingest/singlestore.in include requirements/ingest/weaviate.in include requirements/ingest/wikipedia.in diff --git a/Makefile b/Makefile index 2dccecfd3..f4176b5bd 100644 --- a/Makefile +++ b/Makefile @@ -197,6 +197,10 @@ install-ingest-airtable: install-ingest-sharepoint: python3 -m pip install -r requirements/ingest/sharepoint.txt +.PHONY: install-ingest-singlestore +install-ingest-singlestore: + python3 -m pip install -r requirements/ingest/singlestore.txt + .PHONY: install-ingest-weaviate install-ingest-weaviate: python3 -m pip install -r requirements/ingest/weaviate.txt diff --git a/requirements/ingest/singlestore.in b/requirements/ingest/singlestore.in new file mode 100644 index 000000000..5a7e51c28 --- /dev/null +++ b/requirements/ingest/singlestore.in @@ -0,0 +1,3 @@ +-c ../deps/constraints.txt +-c ../base.txt +singlestoredb diff --git a/requirements/ingest/singlestore.txt b/requirements/ingest/singlestore.txt new file mode 100644 index 000000000..c0390166f --- /dev/null +++ b/requirements/ingest/singlestore.txt @@ -0,0 +1,66 @@ +# +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: +# +# pip-compile singlestore.in +# +build==1.2.1 + # via singlestoredb +certifi==2024.6.2 + # via + # -c ../base.txt + # -c ../deps/constraints.txt + # requests +charset-normalizer==3.3.2 + # via + # -c ../base.txt + # requests +idna==3.7 + # via + # -c ../base.txt + # requests +importlib-metadata==7.1.0 + # via + # -c ../deps/constraints.txt + # build +packaging==23.2 + # via + # -c ../base.txt + # -c ../deps/constraints.txt + # build +parsimonious==0.10.0 + # via singlestoredb +pyjwt==2.8.0 + # via singlestoredb +pyproject-hooks==1.1.0 + # via build +regex==2024.5.15 + # via + # -c ../base.txt + # parsimonious +requests==2.32.3 + # via + # -c ../base.txt + # singlestoredb +singlestoredb==1.4.0 + # via -r singlestore.in +sqlparams==6.0.1 + # via singlestoredb +tomli==2.0.1 + # via + # build + # singlestoredb +urllib3==1.26.19 + # via + # -c ../base.txt + # -c ../deps/constraints.txt + # requests +wheel==0.43.0 + # via + # -c ../deps/constraints.txt + # singlestoredb +zipp==3.19.2 + # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/scripts/singlestore-test-helpers/docker-compose.yml b/scripts/singlestore-test-helpers/docker-compose.yml new file mode 100644 index 000000000..cc84a7240 --- /dev/null +++ b/scripts/singlestore-test-helpers/docker-compose.yml @@ -0,0 +1,21 @@ +services: + singlestore: + container_name: "singlestore" + image: ghcr.io/singlestore-labs/singlestoredb-dev:latest + platform: linux/amd64 + ports: + - 3306:3306 + - 8080:8080 + - 9000:9000 + environment: + - ROOT_PASSWORD=password + volumes: + - ./schema.sql:/init.sql + + # Allow docker compose up --wait to exit only when singlestore is healthy + wait: + image: hello-world:latest + container_name: singlestore-waiter + depends_on: + singlestore: + condition: service_healthy diff --git a/scripts/singlestore-test-helpers/schema.sql b/scripts/singlestore-test-helpers/schema.sql new file mode 100644 index 000000000..eb4383192 --- /dev/null +++ b/scripts/singlestore-test-helpers/schema.sql @@ -0,0 +1,49 @@ +CREATE DATABASE ingest_test; +USE ingest_test; + +CREATE TABLE elements ( + id INT PRIMARY KEY NOT NULL AUTO_INCREMENT, + element_id TEXT, + text TEXT, + embeddings Vector(384), + type TEXT, + url TEXT, + version TEXT, + data_source_date_created TIMESTAMP, + data_source_date_modified TIMESTAMP, + data_source_date_processed TIMESTAMP, + data_source_permissions_data TEXT, + data_source_url TEXT, + data_source_version TEXT, + data_source_record_locator JSON, + category_depth INTEGER, + parent_id TEXT, + attached_filename TEXT, + filetype TEXT, + last_modified TIMESTAMP, + file_directory TEXT, + filename TEXT, + languages TEXT, + page_number TEXT, + links TEXT, + page_name TEXT, + link_urls TEXT, + link_texts TEXT, + sent_from TEXT, + sent_to TEXT, + subject TEXT, + section TEXT, + header_footer_type TEXT, + emphasized_text_contents TEXT, + emphasized_text_tags TEXT, + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL, + is_continuation BOOLEAN, + orig_elements TEXT, + coordinates_points TEXT, + coordinates_system TEXT, + coordinates_layout_width DECIMAL, + coordinates_layout_height DECIMAL +); + diff --git a/scripts/singlestore-test-helpers/test_outputs.py b/scripts/singlestore-test-helpers/test_outputs.py new file mode 100755 index 000000000..7c4ddaa83 --- /dev/null +++ b/scripts/singlestore-test-helpers/test_outputs.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +import click +import singlestoredb as s2 +from singlestoredb.connection import Connection + + +def get_connection( + host: str = None, port: int = None, database: str = None, user: str = None, password: str = None +) -> Connection: + conn = s2.connect( + host=host, + port=port, + database=database, + user=user, + password=password, + ) + return conn + + +def validate(table_name: str, conn: Connection, num_elements: int): + with conn.cursor() as cur: + stmt = f"select * from {table_name}" + count = cur.execute(stmt) + assert ( + count == num_elements + ), f"found count ({count}) doesn't match expected value: {num_elements}" + print("validation successful") + + +@click.command() +@click.option("--host", type=str, default="localhost", show_default=True) +@click.option("--port", type=int, default=3306, show_default=True) +@click.option("--user", type=str, default="root", show_default=True) +@click.option("--password", type=str, default="password") +@click.option("--database", type=str, required=True) +@click.option("--table-name", type=str, required=True) +@click.option( + "--num-elements", type=int, required=True, help="The expected number of elements to exist" +) +def run_validation( + host: str, + port: int, + user: str, + database: str, + password: str, + table_name: str, + num_elements: int, +): + print(f"Validating that table {table_name} in database {database} has {num_elements} entries") + conn = get_connection(host=host, port=port, database=database, user=user, password=password) + validate(table_name=table_name, conn=conn, num_elements=num_elements) + + +if __name__ == "__main__": + run_validation() diff --git a/setup.py b/setup.py index 5c71fbad4..717f477be 100644 --- a/setup.py +++ b/setup.py @@ -177,6 +177,7 @@ setup( "openai": load_requirements("requirements/ingest/embed-openai.in"), "bedrock": load_requirements("requirements/ingest/embed-aws-bedrock.in"), "databricks-volumes": load_requirements("requirements/ingest/databricks-volumes.in"), + "singlestore": load_requirements("requirements/ingest/singlestore.in"), }, package_dir={"unstructured": "unstructured"}, package_data={"unstructured": ["nlp/*.txt", "py.typed"]}, diff --git a/test_unstructured_ingest/dest/singlestore.sh b/test_unstructured_ingest/dest/singlestore.sh new file mode 100755 index 000000000..1816a0e0e --- /dev/null +++ b/test_unstructured_ingest/dest/singlestore.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash + +set -e + +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=singlestore-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +CI=${CI:-"false"} +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + # Index cleanup + echo "Stopping Singlestore Docker container" + docker compose -f scripts/singlestore-test-helpers/docker-compose.yml down --remove-orphans -v + + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" + +} + +trap cleanup EXIT + +# Create singlestore instance and create `elements` class +echo "Creating singlestore instance" +# shellcheck source=/dev/null +docker compose -f scripts/singlestore-test-helpers/docker-compose.yml up -d --wait-timeout 60 + +DATABASE=ingest_test +USER=root +HOST=localhost +PASSWORD=password +PORT=3306 +TABLE=elements + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + --embedding-provider "langchain-huggingface" \ + singlestore \ + --host $HOST \ + --user $USER \ + --password $PASSWORD \ + --database $DATABASE \ + --port $PORT \ + --table-name $TABLE \ + --drop-empty-cols + +expected_num_elements=$(cat "$WORK_DIR"/embed/* | jq 'length') +./scripts/singlestore-test-helpers/test_outputs.py \ + --table-name $TABLE \ + --database $DATABASE \ + --num-elements "$expected_num_elements" diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index a54b0ea0d..2e46d6090 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -35,6 +35,7 @@ all_tests=( 'sharepoint-embed-cog-index.sh' 'sqlite.sh' 'vectara.sh' + 'singlestore.sh' 'weaviate.sh' ) diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 2e85380be..6a6e09835 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.14.10-dev6" # pragma: no cover +__version__ = "0.14.10-dev7" # pragma: no cover diff --git a/unstructured/ingest/connector/astra.py b/unstructured/ingest/connector/astra.py index 554af0e4a..1475d481b 100644 --- a/unstructured/ingest/connector/astra.py +++ b/unstructured/ingest/connector/astra.py @@ -14,7 +14,7 @@ from unstructured.ingest.interfaces import ( WriteConfig, ) from unstructured.ingest.logger import logger -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.utils import requires_dependencies if t.TYPE_CHECKING: @@ -114,7 +114,7 @@ class AstraDestinationConnector(BaseDestinationConnector): astra_batch_size = self.write_config.batch_size - for chunk in chunk_generator(elements_dict, astra_batch_size): + for chunk in batch_generator(elements_dict, astra_batch_size): self._astra_db_collection.insert_many(chunk) def normalize_dict(self, element_dict: dict) -> dict: diff --git a/unstructured/ingest/connector/chroma.py b/unstructured/ingest/connector/chroma.py index d35d6117f..547b988a2 100644 --- a/unstructured/ingest/connector/chroma.py +++ b/unstructured/ingest/connector/chroma.py @@ -12,7 +12,7 @@ from unstructured.ingest.interfaces import ( WriteConfig, ) from unstructured.ingest.logger import logger -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.staging.base import flatten_dict from unstructured.utils import requires_dependencies @@ -144,7 +144,7 @@ class ChromaDestinationConnector(BaseDestinationConnector): chroma_batch_size = self.write_config.batch_size - for chunk in chunk_generator(elements_dict, chroma_batch_size): + for chunk in batch_generator(elements_dict, chroma_batch_size): self.upsert_batch(self.prepare_chroma_list(chunk)) def normalize_dict(self, element_dict: dict) -> dict: diff --git a/unstructured/ingest/connector/kafka.py b/unstructured/ingest/connector/kafka.py index bddc7d2f8..4510cf3d7 100644 --- a/unstructured/ingest/connector/kafka.py +++ b/unstructured/ingest/connector/kafka.py @@ -21,7 +21,7 @@ from unstructured.ingest.interfaces import ( WriteConfig, ) from unstructured.ingest.logger import logger -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.utils import requires_dependencies if t.TYPE_CHECKING: @@ -270,7 +270,7 @@ class KafkaDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConn logger.info(f"Writing {len(dict_list)} documents to Kafka") num_uploaded = 0 - for chunk in chunk_generator(dict_list, self.write_config.batch_size): + for chunk in batch_generator(dict_list, self.write_config.batch_size): num_uploaded += self.upload_msg(chunk) # noqa: E203 producer = self.kafka_producer diff --git a/unstructured/ingest/connector/pinecone.py b/unstructured/ingest/connector/pinecone.py index 9ede3cb0a..3d6de3cc6 100644 --- a/unstructured/ingest/connector/pinecone.py +++ b/unstructured/ingest/connector/pinecone.py @@ -17,7 +17,7 @@ from unstructured.ingest.interfaces import ( WriteConfig, ) from unstructured.ingest.logger import logger -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.staging.base import flatten_dict from unstructured.utils import requires_dependencies @@ -111,7 +111,7 @@ class PineconeDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationC logger.info(f"using {self.write_config.num_processes} processes to upload") if self.write_config.num_processes == 1: - for chunk in chunk_generator(elements_dict, pinecone_batch_size): + for chunk in batch_generator(elements_dict, pinecone_batch_size): self.upsert_batch(chunk) # noqa: E203 else: @@ -119,7 +119,7 @@ class PineconeDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationC processes=self.write_config.num_processes, ) as pool: pool.map( - self.upsert_batch, list(chunk_generator(elements_dict, pinecone_batch_size)) + self.upsert_batch, list(batch_generator(elements_dict, pinecone_batch_size)) ) def normalize_dict(self, element_dict: dict) -> dict: diff --git a/unstructured/ingest/connector/qdrant.py b/unstructured/ingest/connector/qdrant.py index f2c24cb5b..da19c2dae 100644 --- a/unstructured/ingest/connector/qdrant.py +++ b/unstructured/ingest/connector/qdrant.py @@ -15,7 +15,7 @@ from unstructured.ingest.interfaces import ( WriteConfig, ) from unstructured.ingest.logger import logger -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.staging.base import flatten_dict from unstructured.utils import requires_dependencies @@ -120,14 +120,14 @@ class QdrantDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationCon logger.info(f"using {self.write_config.num_processes} processes to upload") if self.write_config.num_processes == 1: - for chunk in chunk_generator(elements_dict, qdrant_batch_size): + for chunk in batch_generator(elements_dict, qdrant_batch_size): self.upsert_batch(chunk) else: with mp.Pool( processes=self.write_config.num_processes, ) as pool: - pool.map(self.upsert_batch, list(chunk_generator(elements_dict, qdrant_batch_size))) + pool.map(self.upsert_batch, list(batch_generator(elements_dict, qdrant_batch_size))) def normalize_dict(self, element_dict: dict) -> dict: return { diff --git a/unstructured/ingest/utils/data_prep.py b/unstructured/ingest/utils/data_prep.py index 587c7e115..722de16e4 100644 --- a/unstructured/ingest/utils/data_prep.py +++ b/unstructured/ingest/utils/data_prep.py @@ -2,8 +2,8 @@ import itertools import json -def chunk_generator(iterable, batch_size=100): - """A helper function to break an iterable into chunks of size batch_size.""" +def batch_generator(iterable, batch_size=100): + """A helper function to break an iterable into batches of size batch_size.""" it = iter(iterable) chunk = tuple(itertools.islice(it, batch_size)) while chunk: diff --git a/unstructured/ingest/v2/cli/cmds/__init__.py b/unstructured/ingest/v2/cli/cmds/__init__.py index 2d69dd16b..78cbca207 100644 --- a/unstructured/ingest/v2/cli/cmds/__init__.py +++ b/unstructured/ingest/v2/cli/cmds/__init__.py @@ -17,6 +17,7 @@ from .mongodb import mongodb_dest_cmd from .onedrive import onedrive_drive_src_cmd from .opensearch import opensearch_dest_cmd, opensearch_src_cmd from .pinecone import pinecone_dest_cmd +from .singlestore import singlestore_dest_cmd from .weaviate import weaviate_dest_cmd src_cmds = [ @@ -55,6 +56,7 @@ dest_cmds = [ pinecone_dest_cmd, s3_dest_cmd, sftp_dest_cmd, + singlestore_dest_cmd, weaviate_dest_cmd, mongodb_dest_cmd, ] diff --git a/unstructured/ingest/v2/cli/cmds/singlestore.py b/unstructured/ingest/v2/cli/cmds/singlestore.py new file mode 100644 index 000000000..1b7809d09 --- /dev/null +++ b/unstructured/ingest/v2/cli/cmds/singlestore.py @@ -0,0 +1,96 @@ +from dataclasses import dataclass + +import click + +from unstructured.ingest.v2.cli.base import DestCmd +from unstructured.ingest.v2.cli.interfaces import CliConfig +from unstructured.ingest.v2.processes.connectors.singlestore import CONNECTOR_TYPE + + +@dataclass +class SingleStoreCliConnectionConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--host"], + required=False, + type=str, + default=None, + help="SingleStore host", + ), + click.Option( + ["--port"], + required=False, + type=int, + default=None, + help="SingleStore port", + ), + click.Option( + ["--user"], + required=False, + type=str, + default=None, + help="SingleStore user", + ), + click.Option( + ["--password"], + required=False, + type=str, + default=None, + help="SingleStore password", + ), + click.Option( + ["--database"], + required=False, + type=str, + default=None, + help="SingleStore database", + ), + ] + return options + + +@dataclass +class SingleStoreCliUploaderConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--drop-empty-cols"], + required=False, + type=bool, + is_flag=True, + default=False, + help="Drop any columns that have no data", + ), + ] + return options + + +@dataclass +class SingleStoreCliUploadStagerConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + return [ + click.Option( + ["--table-name"], + required=False, + type=str, + help="SingleStore table to write contents to", + ), + click.Option( + ["--batch-size"], + required=False, + type=click.IntRange(min=1), + help="Batch size when writing to SingleStore", + ), + ] + + +singlestore_dest_cmd = DestCmd( + cmd_name=CONNECTOR_TYPE, + connection_config=SingleStoreCliConnectionConfig, + uploader_config=SingleStoreCliUploaderConfig, + upload_stager_config=SingleStoreCliUploadStagerConfig, +) diff --git a/unstructured/ingest/v2/examples/example_singlestore.py b/unstructured/ingest/v2/examples/example_singlestore.py new file mode 100644 index 000000000..47d4494a9 --- /dev/null +++ b/unstructured/ingest/v2/examples/example_singlestore.py @@ -0,0 +1,48 @@ +from pathlib import Path + +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.chunker import ChunkerConfig +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured.ingest.v2.processes.connectors.singlestore import ( + SingleStoreAccessConfig, + SingleStoreConnectionConfig, + SingleStoreUploaderConfig, + SingleStoreUploadStagerConfig, +) +from unstructured.ingest.v2.processes.embedder import EmbedderConfig +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +base_path = Path(__file__).parent.parent.parent.parent.parent +docs_path = base_path / "example-docs" +work_dir = base_path / "tmp_ingest" +output_path = work_dir / "output" +download_path = work_dir / "download" + +if __name__ == "__main__": + logger.info(f"Writing all content in: {work_dir.resolve()}") + Pipeline.from_configs( + context=ProcessorConfig(work_dir=str(work_dir.resolve()), tqdm=True, verbose=True), + indexer_config=LocalIndexerConfig( + input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt" + ), + downloader_config=LocalDownloaderConfig(download_dir=download_path), + source_connection_config=LocalConnectionConfig(), + partitioner_config=PartitionerConfig(strategy="fast"), + chunker_config=ChunkerConfig(chunking_strategy="by_title"), + embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"), + destination_connection_config=SingleStoreConnectionConfig( + access_config=SingleStoreAccessConfig(password="password"), + host="localhost", + port=3306, + database="ingest_test", + user="root", + ), + stager_config=SingleStoreUploadStagerConfig(), + uploader_config=SingleStoreUploaderConfig(table_name="elements"), + ).run() diff --git a/unstructured/ingest/v2/processes/connectors/astra.py b/unstructured/ingest/v2/processes/connectors/astra.py index e6ae5c577..dbd620472 100644 --- a/unstructured/ingest/v2/processes/connectors/astra.py +++ b/unstructured/ingest/v2/processes/connectors/astra.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional from unstructured import __name__ as integration_name from unstructured.__version__ import __version__ as integration_version from unstructured.ingest.enhanced_dataclass import enhanced_field -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.ingest.v2.interfaces import ( AccessConfig, ConnectionConfig, @@ -138,7 +138,7 @@ class AstraUploader(Uploader): astra_batch_size = self.upload_config.batch_size collection = self.get_collection() - for chunk in chunk_generator(elements_dict, astra_batch_size): + for chunk in batch_generator(elements_dict, astra_batch_size): collection.insert_many(chunk) diff --git a/unstructured/ingest/v2/processes/connectors/chroma.py b/unstructured/ingest/v2/processes/connectors/chroma.py index d4891b023..fb1f95518 100644 --- a/unstructured/ingest/v2/processes/connectors/chroma.py +++ b/unstructured/ingest/v2/processes/connectors/chroma.py @@ -9,7 +9,7 @@ from dateutil import parser from unstructured.ingest.enhanced_dataclass import enhanced_field from unstructured.ingest.error import DestinationConnectionError -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.ingest.v2.interfaces import ( AccessConfig, ConnectionConfig, @@ -192,7 +192,7 @@ class ChromaUploader(Uploader): collection = self.client.get_or_create_collection( name=self.connection_config.collection_name ) - for chunk in chunk_generator(elements_dict, self.upload_config.batch_size): + for chunk in batch_generator(elements_dict, self.upload_config.batch_size): self.upsert_batch(collection, self.prepare_chroma_list(chunk)) diff --git a/unstructured/ingest/v2/processes/connectors/mongodb.py b/unstructured/ingest/v2/processes/connectors/mongodb.py index cbf7ddfe1..eb51a7bcc 100644 --- a/unstructured/ingest/v2/processes/connectors/mongodb.py +++ b/unstructured/ingest/v2/processes/connectors/mongodb.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any, Optional from unstructured.__version__ import __version__ as unstructured_version from unstructured.ingest.enhanced_dataclass import enhanced_field -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.ingest.v2.interfaces import ( AccessConfig, ConnectionConfig, @@ -125,7 +125,7 @@ class MongoDBUploader(Uploader): ) db = self.client[self.connection_config.database] collection = db[self.connection_config.collection] - for chunk in chunk_generator(elements_dict, self.upload_config.batch_size): + for chunk in batch_generator(elements_dict, self.upload_config.batch_size): collection.insert_many(chunk) diff --git a/unstructured/ingest/v2/processes/connectors/pinecone.py b/unstructured/ingest/v2/processes/connectors/pinecone.py index 44acd6514..13c752786 100644 --- a/unstructured/ingest/v2/processes/connectors/pinecone.py +++ b/unstructured/ingest/v2/processes/connectors/pinecone.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any, Optional from unstructured.ingest.enhanced_dataclass import enhanced_field from unstructured.ingest.error import DestinationConnectionError -from unstructured.ingest.utils.data_prep import chunk_generator +from unstructured.ingest.utils.data_prep import batch_generator from unstructured.ingest.v2.interfaces import ( AccessConfig, ConnectionConfig, @@ -158,15 +158,15 @@ class PineconeUploader(Uploader): pinecone_batch_size = self.upload_config.batch_size if self.upload_config.num_of_processes == 1: - for chunk in chunk_generator(elements_dict, pinecone_batch_size): - self.upsert_batch(chunk) # noqa: E203 + for batch in batch_generator(elements_dict, pinecone_batch_size): + self.upsert_batch(batch) # noqa: E203 else: with mp.Pool( processes=self.upload_config.num_of_processes, ) as pool: pool.map( - self.upsert_batch, list(chunk_generator(elements_dict, pinecone_batch_size)) + self.upsert_batch, list(batch_generator(elements_dict, pinecone_batch_size)) ) diff --git a/unstructured/ingest/v2/processes/connectors/singlestore.py b/unstructured/ingest/v2/processes/connectors/singlestore.py new file mode 100644 index 000000000..3cdc5719b --- /dev/null +++ b/unstructured/ingest/v2/processes/connectors/singlestore.py @@ -0,0 +1,164 @@ +import json +from dataclasses import dataclass +from datetime import date, datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional + +import numpy as np +import pandas as pd +from dateutil import parser + +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.utils.data_prep import batch_generator +from unstructured.ingest.utils.table import convert_to_pandas_dataframe +from unstructured.ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + FileData, + UploadContent, + Uploader, + UploaderConfig, + UploadStager, + UploadStagerConfig, +) +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.processes.connector_registry import ( + DestinationRegistryEntry, + add_destination_entry, +) +from unstructured.utils import requires_dependencies + +if TYPE_CHECKING: + from singlestoredb.connection import Connection + +CONNECTOR_TYPE = "singlestore" + + +@dataclass +class SingleStoreAccessConfig(AccessConfig): + password: Optional[str] = None + + +@dataclass +class SingleStoreConnectionConfig(ConnectionConfig): + host: Optional[str] = None + port: Optional[int] = None + user: Optional[str] = None + database: Optional[str] = None + access_config: SingleStoreAccessConfig = enhanced_field(sensitive=True) + + @requires_dependencies(["singlestoredb"], extras="singlestore") + def get_connection(self) -> "Connection": + import singlestoredb as s2 + + conn = s2.connect( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.access_config.password, + ) + return conn + + +@dataclass +class SingleStoreUploadStagerConfig(UploadStagerConfig): + drop_empty_cols: bool = False + + +@dataclass +class SingleStoreUploadStager(UploadStager): + upload_stager_config: SingleStoreUploadStagerConfig + + @staticmethod + def parse_date_string(date_string: str) -> date: + try: + timestamp = float(date_string) + return datetime.fromtimestamp(timestamp) + except Exception as e: + logger.debug(f"date {date_string} string not a timestamp: {e}") + return parser.parse(date_string) + + def run( + self, + elements_filepath: Path, + file_data: FileData, + output_dir: Path, + output_filename: str, + **kwargs: Any, + ) -> Path: + with open(elements_filepath) as elements_file: + elements_contents = json.load(elements_file) + output_path = Path(output_dir) / Path(f"{output_filename}.csv") + output_path.parent.mkdir(parents=True, exist_ok=True) + + df = convert_to_pandas_dataframe( + elements_dict=elements_contents, + drop_empty_cols=self.upload_stager_config.drop_empty_cols, + ) + datetime_columns = [ + "data_source_date_created", + "data_source_date_modified", + "data_source_date_processed", + ] + for column in filter(lambda x: x in df.columns, datetime_columns): + df[column] = df[column].apply(self.parse_date_string) + if "data_source_record_locator" in df.columns: + df["data_source_record_locator"] = df["data_source_record_locator"].apply( + lambda x: json.dumps(x) if x else None + ) + + with output_path.open("w") as output_file: + df.to_csv(output_file, index=False) + return output_path + + +@dataclass +class SingleStoreUploaderConfig(UploaderConfig): + table_name: str + batch_size: int = 100 + + +@dataclass +class SingleStoreUploader(Uploader): + connection_config: SingleStoreConnectionConfig + upload_config: SingleStoreUploaderConfig + connector_type: str = CONNECTOR_TYPE + + def upload_csv(self, content: UploadContent) -> None: + df = pd.read_csv(content.path) + logger.debug( + f"uploading {len(df)} entries to {self.connection_config.database} " + f"db in table {self.upload_config.table_name}" + ) + stmt = "INSERT INTO {} ({}) VALUES ({})".format( + self.upload_config.table_name, + ", ".join(df.columns), + ", ".join(["%s"] * len(df.columns)), + ) + logger.debug(f"sql statement: {stmt}") + df.replace({np.nan: None}, inplace=True) + data_as_tuples = list(df.itertuples(index=False, name=None)) + with self.connection_config.get_connection() as conn: + with conn.cursor() as cur: + for chunk in batch_generator( + data_as_tuples, batch_size=self.upload_config.batch_size + ): + cur.executemany(stmt, chunk) + conn.commit() + + def run(self, contents: list[UploadContent], **kwargs: Any) -> None: + for content in contents: + self.upload_csv(content=content) + + +add_destination_entry( + destination_type=CONNECTOR_TYPE, + entry=DestinationRegistryEntry( + connection_config=SingleStoreConnectionConfig, + uploader=SingleStoreUploader, + uploader_config=SingleStoreUploaderConfig, + upload_stager=SingleStoreUploadStager, + upload_stager_config=SingleStoreUploadStagerConfig, + ), +) diff --git a/unstructured/ingest/v2/processes/connectors/weaviate.py b/unstructured/ingest/v2/processes/connectors/weaviate.py index f89cedb3a..d3b528fdc 100644 --- a/unstructured/ingest/v2/processes/connectors/weaviate.py +++ b/unstructured/ingest/v2/processes/connectors/weaviate.py @@ -21,6 +21,7 @@ from unstructured.ingest.v2.logger import logger from unstructured.ingest.v2.processes.connector_registry import ( DestinationRegistryEntry, ) +from unstructured.utils import requires_dependencies if TYPE_CHECKING: from weaviate import Client @@ -153,17 +154,19 @@ class WeaviateUploaderConfig(UploaderConfig): @dataclass class WeaviateUploader(Uploader): - connector_type: str = CONNECTOR_TYPE upload_config: WeaviateUploaderConfig connection_config: WeaviateConnectionConfig client: Optional["Client"] = field(init=False) + connector_type: str = CONNECTOR_TYPE + @requires_dependencies(["weaviate"], extras="weaviate") def __post_init__(self): from weaviate import Client auth = self._resolve_auth_method() self.client = Client(url=self.connection_config.host_url, auth_client_secret=auth) + @requires_dependencies(["weaviate"], extras="weaviate") def _resolve_auth_method(self): access_configs = self.connection_config.access_config connection_config = self.connection_config