From 950e5d68f90c8b38c4bcd9dc6a51489f4f35ba65 Mon Sep 17 00:00:00 2001 From: rvztz Date: Thu, 4 Jan 2024 13:33:16 -0600 Subject: [PATCH] feat: adds postgresql/sqlite destination connector (#2005) - Adds a destination connector to upload processed output into a PostgreSQL/Sqlite database instance. - Users are responsible to provide their instances. This PR includes a couple of configuration examples. - Defines the scripts required to setup a PostgreSQL instance with the unstructured elements schema. - Validates postgres/pgvector embedding storage and retrieval --------- Co-authored-by: potter-potter --- CHANGELOG.md | 6 +- Makefile | 4 + docs/source/ingest/destination_connectors.rst | 1 + .../destination_connectors/code/bash/sql.sh | 17 ++ .../destination_connectors/code/python/sql.py | 27 +++ .../data/pgvector-schema.sql | 44 ++++ .../data/postgres-schema.sql | 42 ++++ .../data/sqlite-schema.sql | 41 ++++ .../ingest/destination_connectors/sql.rst | 62 ++++++ examples/ingest/sql/ingest.sh | 26 +++ requirements/ingest/postgres.in | 3 + requirements/ingest/postgres.txt | 8 + .../create-pgvector-schema.sql | 44 ++++ .../sql-test-helpers/create-sql-instance.sh | 20 ++ .../sql-test-helpers/create-sqlite-schema.py | 14 ++ .../sql-test-helpers/create-sqlite-schema.sql | 41 ++++ .../docker-compose-pgvector.yaml | 13 ++ test_unstructured_ingest/dest/pgvector.sh | 54 +++++ test_unstructured_ingest/dest/sqlite.sh | 52 +++++ .../python/test-ingest-sql-output.py | 65 ++++++ test_unstructured_ingest/test-ingest-dest.sh | 4 +- .../unit/test_interfaces.py | 90 -------- test_unstructured_ingest/unit/test_paths.py | 94 ++++++++ .../unit/test_sql_conform_dict.py | 125 +++++++++++ unstructured/__version__.py | 2 +- unstructured/ingest/cli/cmds/__init__.py | 2 + unstructured/ingest/cli/cmds/sql.py | 66 ++++++ unstructured/ingest/connector/sql.py | 201 ++++++++++++++++++ unstructured/ingest/interfaces.py | 4 +- .../ingest/runner/writers/__init__.py | 2 + unstructured/ingest/runner/writers/sql.py | 22 ++ 31 files changed, 1099 insertions(+), 97 deletions(-) create mode 100644 docs/source/ingest/destination_connectors/code/bash/sql.sh create mode 100644 docs/source/ingest/destination_connectors/code/python/sql.py create mode 100644 docs/source/ingest/destination_connectors/data/pgvector-schema.sql create mode 100644 docs/source/ingest/destination_connectors/data/postgres-schema.sql create mode 100644 docs/source/ingest/destination_connectors/data/sqlite-schema.sql create mode 100644 docs/source/ingest/destination_connectors/sql.rst create mode 100755 examples/ingest/sql/ingest.sh create mode 100644 requirements/ingest/postgres.in create mode 100644 requirements/ingest/postgres.txt create mode 100644 scripts/sql-test-helpers/create-pgvector-schema.sql create mode 100755 scripts/sql-test-helpers/create-sql-instance.sh create mode 100644 scripts/sql-test-helpers/create-sqlite-schema.py create mode 100644 scripts/sql-test-helpers/create-sqlite-schema.sql create mode 100644 scripts/sql-test-helpers/docker-compose-pgvector.yaml create mode 100755 test_unstructured_ingest/dest/pgvector.sh create mode 100755 test_unstructured_ingest/dest/sqlite.sh create mode 100755 test_unstructured_ingest/python/test-ingest-sql-output.py create mode 100644 test_unstructured_ingest/unit/test_sql_conform_dict.py create mode 100644 unstructured/ingest/cli/cmds/sql.py create mode 100644 unstructured/ingest/connector/sql.py create mode 100644 unstructured/ingest/runner/writers/sql.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 32113b493..13dfd363c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.11.9-dev0 +## 0.11.9-dev1 ### Enhancements @@ -6,6 +6,8 @@ ### Features +* **Add PostgreSQL/SQLite destination connector** PostgreSQL and SQLite connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data to a PostgreSQL or SQLite database. And write embeddings to PostgreSQL pgvector database. + ### Fixes ## 0.11.8 @@ -16,10 +18,8 @@ ### Features - ### Fixes - ## 0.11.7 ### Enhancements diff --git a/Makefile b/Makefile index 9847f0f1e..61ecab801 100644 --- a/Makefile +++ b/Makefile @@ -231,6 +231,10 @@ install-ingest-qdrant: install-ingest-chroma: python3 -m pip install -r requirements/ingest/chroma.txt +.PHONY: install-ingest-postgres +install-ingest-postgres: + python3 -m pip install -r requirements/ingest-postgres.txt + .PHONY: install-embed-huggingface install-embed-huggingface: python3 -m pip install -r requirements/ingest/embed-huggingface.txt diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index 98cfce29c..5ef4f6373 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -19,5 +19,6 @@ in our community `Slack. `_ destination_connectors/pinecone destination_connectors/qdrant destination_connectors/s3 + destination_connectors/sql destination_connectors/weaviate diff --git a/docs/source/ingest/destination_connectors/code/bash/sql.sh b/docs/source/ingest/destination_connectors/code/bash/sql.sh new file mode 100644 index 000000000..7970febbc --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/bash/sql.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +unstructured-ingest \ + local \ + --input-path example-docs/fake-memo.pdf \ + --anonymous \ + --output-dir local-output-to-mongo \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + sql \ + --db-type postgresql \ + --username postgres \ + --password test \ + --host localhost \ + --port 5432 \ + --database elements diff --git a/docs/source/ingest/destination_connectors/code/python/sql.py b/docs/source/ingest/destination_connectors/code/python/sql.py new file mode 100644 index 000000000..5065d9fc9 --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/python/sql.py @@ -0,0 +1,27 @@ +import os + +from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig +from unstructured.ingest.runner import LocalRunner + +if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-postgres", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + writer_type="sql", + writer_kwargs={ + "db_type": os.getenv("DB_TYPE"), + "username": os.getenv("USERNAME"), + "password": os.getenv("DB_PASSWORD"), + "host": os.getenv("DB_HOST"), + "port": os.getenv("DB_PORT"), + "database": os.getenv("DB_DATABASE"), + }, + ) + runner.run( + input_path="example-docs/fake-memo.pdf", + ) diff --git a/docs/source/ingest/destination_connectors/data/pgvector-schema.sql b/docs/source/ingest/destination_connectors/data/pgvector-schema.sql new file mode 100644 index 000000000..f21c687d9 --- /dev/null +++ b/docs/source/ingest/destination_connectors/data/pgvector-schema.sql @@ -0,0 +1,44 @@ +CREATE EXTENSION vector; + +CREATE TABLE elements ( + id UUID PRIMARY KEY, + element_id VARCHAR, + text TEXT, + embeddings vector(384), + type VARCHAR, + system VARCHAR, + layout_width DECIMAL, + layout_height DECIMAL, + points TEXT, + url TEXT, + version VARCHAR, + date_created TIMESTAMPTZ, + date_modified TIMESTAMPTZ, + date_processed TIMESTAMPTZ, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id VARCHAR, + attached_filename VARCHAR, + filetype VARCHAR, + last_modified TIMESTAMPTZ, + file_directory VARCHAR, + filename VARCHAR, + languages VARCHAR [], + page_number VARCHAR, + links TEXT, + page_name VARCHAR, + link_urls VARCHAR [], + link_texts VARCHAR [], + sent_from VARCHAR [], + sent_to VARCHAR [], + subject VARCHAR, + section VARCHAR, + header_footer_type VARCHAR, + emphasized_text_contents VARCHAR [], + emphasized_text_tags VARCHAR [], + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); + diff --git a/docs/source/ingest/destination_connectors/data/postgres-schema.sql b/docs/source/ingest/destination_connectors/data/postgres-schema.sql new file mode 100644 index 000000000..4fb40a96a --- /dev/null +++ b/docs/source/ingest/destination_connectors/data/postgres-schema.sql @@ -0,0 +1,42 @@ +CREATE TABLE elements ( + id UUID PRIMARY KEY, + element_id VARCHAR, + text TEXT, + embeddings DECIMAL [], + type VARCHAR, + system VARCHAR, + layout_width DECIMAL, + layout_height DECIMAL, + points TEXT, + url TEXT, + version VARCHAR, + date_created TIMESTAMPTZ, + date_modified TIMESTAMPTZ, + date_processed TIMESTAMPTZ, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id VARCHAR, + attached_filename VARCHAR, + filetype VARCHAR, + last_modified TIMESTAMPTZ, + file_directory VARCHAR, + filename VARCHAR, + languages VARCHAR [], + page_number VARCHAR, + links TEXT, + page_name VARCHAR, + link_urls VARCHAR [], + link_texts VARCHAR [], + sent_from VARCHAR [], + sent_to VARCHAR [], + subject VARCHAR, + section VARCHAR, + header_footer_type VARCHAR, + emphasized_text_contents VARCHAR [], + emphasized_text_tags VARCHAR [], + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); + diff --git a/docs/source/ingest/destination_connectors/data/sqlite-schema.sql b/docs/source/ingest/destination_connectors/data/sqlite-schema.sql new file mode 100644 index 000000000..89be17b3f --- /dev/null +++ b/docs/source/ingest/destination_connectors/data/sqlite-schema.sql @@ -0,0 +1,41 @@ +CREATE TABLE elements ( + id TEXT PRIMARY KEY, + element_id TEXT, + text TEXT, + embeddings TEXT, + type TEXT, + system TEXT, + layout_width REAL, + layout_height REAL, + points TEXT, + url TEXT, + version TEXT, + date_created TEXT, + date_modified TEXT, + date_processed TEXT, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id TEXT, + attached_filename TEXT, + filetype TEXT, + last_modified TEXT, + file_directory TEXT, + filename TEXT, + languages TEXT, + page_number TEXT, + links TEXT, + page_name TEXT, + link_urls TEXT, + link_texts TEXT, + sent_from TEXT, + sent_to TEXT, + subject TEXT, + section TEXT, + header_footer_type TEXT, + emphasized_text_contents TEXT, + emphasized_text_tags TEXT, + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); diff --git a/docs/source/ingest/destination_connectors/sql.rst b/docs/source/ingest/destination_connectors/sql.rst new file mode 100644 index 000000000..0420c6f88 --- /dev/null +++ b/docs/source/ingest/destination_connectors/sql.rst @@ -0,0 +1,62 @@ +SQL +=========== +NOTE: At the moment, the connector only supports PostgreSQL and SQLite. Vectors can be stored and searched in PostgreSQL with pgvector. + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a PostgreSQL or SQLite schema. + +Insert query is currently limited to append. + +First you'll need to install the sql dependencies as shown here if you are using PostgreSQL. + +.. code:: shell + + pip install "unstructured[postgres]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. + +.. tabs:: + + .. tab:: Shell + + .. literalinclude:: ./code/bash/sql.sh + :language: bash + + .. tab:: Python + + .. literalinclude:: ./code/python/sql.py + :language: python + +For a full list of the options the CLI accepts check ``unstructured-ingest sql --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. + +Sample Index Schema +------------------- + +To make sure the schema of the index matches the data being written to it, a sample schema json can be used. + +.. tabs:: + + .. tab:: PostgreSQL +: + .. literalinclude:: ./data/postgres-schema.sql + :language: sql + :linenos: + :caption: Object description + + .. tab:: PostgreSQL with pgvector + + .. literalinclude:: ./data/pgvector-schema.sql + :language: sql + :linenos: + :caption: Object description + + .. tab:: Sqlite + + .. literalinclude:: ./data/sqlite-schema.sql + :language: sql + :linenos: + :caption: Object description \ No newline at end of file diff --git a/examples/ingest/sql/ingest.sh b/examples/ingest/sql/ingest.sh new file mode 100755 index 000000000..f5cfa3fa7 --- /dev/null +++ b/examples/ingest/sql/ingest.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Uploads the structured output of the files within the given S3 path. + +# Structured outputs are stored in a PostgreSQL instance/ + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) +cd "$SCRIPT_DIR"/../../.. || exit 1 + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --output-dir local-to-pinecone \ + --strategy fast \ + --chunk-elements \ + --embedding-provider "" \ + --num-processes 2 \ + --verbose \ + --work-dir "" \ + sql \ + --db-type postgresql \ + --username postgres \ + --password test \ + --host localhost \ + --port 5432 \ + --database elements diff --git a/requirements/ingest/postgres.in b/requirements/ingest/postgres.in new file mode 100644 index 000000000..e74682ac1 --- /dev/null +++ b/requirements/ingest/postgres.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +psycopg2-binary \ No newline at end of file diff --git a/requirements/ingest/postgres.txt b/requirements/ingest/postgres.txt new file mode 100644 index 000000000..f80c5ab91 --- /dev/null +++ b/requirements/ingest/postgres.txt @@ -0,0 +1,8 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --constraint=requirements/constraints.in requirements/ingest/postgres.in +# +psycopg2-binary==2.9.9 + # via -r requirements/ingest/sql.in diff --git a/scripts/sql-test-helpers/create-pgvector-schema.sql b/scripts/sql-test-helpers/create-pgvector-schema.sql new file mode 100644 index 000000000..f21c687d9 --- /dev/null +++ b/scripts/sql-test-helpers/create-pgvector-schema.sql @@ -0,0 +1,44 @@ +CREATE EXTENSION vector; + +CREATE TABLE elements ( + id UUID PRIMARY KEY, + element_id VARCHAR, + text TEXT, + embeddings vector(384), + type VARCHAR, + system VARCHAR, + layout_width DECIMAL, + layout_height DECIMAL, + points TEXT, + url TEXT, + version VARCHAR, + date_created TIMESTAMPTZ, + date_modified TIMESTAMPTZ, + date_processed TIMESTAMPTZ, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id VARCHAR, + attached_filename VARCHAR, + filetype VARCHAR, + last_modified TIMESTAMPTZ, + file_directory VARCHAR, + filename VARCHAR, + languages VARCHAR [], + page_number VARCHAR, + links TEXT, + page_name VARCHAR, + link_urls VARCHAR [], + link_texts VARCHAR [], + sent_from VARCHAR [], + sent_to VARCHAR [], + subject VARCHAR, + section VARCHAR, + header_footer_type VARCHAR, + emphasized_text_contents VARCHAR [], + emphasized_text_tags VARCHAR [], + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); + diff --git a/scripts/sql-test-helpers/create-sql-instance.sh b/scripts/sql-test-helpers/create-sql-instance.sh new file mode 100755 index 000000000..c5fbd06c7 --- /dev/null +++ b/scripts/sql-test-helpers/create-sql-instance.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +DATABASE_NAME=$1 +DATABASE_FILE_PATH=$2 + +# Create the SQL instance +if [[ "$DATABASE_NAME" != "sqlite" ]]; then + docker compose version + docker compose -f "$SCRIPT_DIR"/docker-compose-"$DATABASE_NAME".yaml up --wait + docker compose -f "$SCRIPT_DIR"/docker-compose-"$DATABASE_NAME".yaml ps +else + touch "$DATABASE_FILE_PATH" + + python "$SCRIPT_DIR"/create-sqlite-schema.py "$DATABASE_FILE_PATH" +fi + +echo "$DATABASE_NAME instance is live." diff --git a/scripts/sql-test-helpers/create-sqlite-schema.py b/scripts/sql-test-helpers/create-sqlite-schema.py new file mode 100644 index 000000000..d3e16f082 --- /dev/null +++ b/scripts/sql-test-helpers/create-sqlite-schema.py @@ -0,0 +1,14 @@ +import sqlite3 +import sys +from pathlib import Path + +if __name__ == "__main__": + connection = sqlite3.connect(database=sys.argv[1]) + + query = None + script_path = (Path(__file__).parent / Path("create-sqlite-schema.sql")).resolve() + with open(script_path) as f: + query = f.read() + cursor = connection.cursor() + cursor.executescript(query) + connection.close() diff --git a/scripts/sql-test-helpers/create-sqlite-schema.sql b/scripts/sql-test-helpers/create-sqlite-schema.sql new file mode 100644 index 000000000..89be17b3f --- /dev/null +++ b/scripts/sql-test-helpers/create-sqlite-schema.sql @@ -0,0 +1,41 @@ +CREATE TABLE elements ( + id TEXT PRIMARY KEY, + element_id TEXT, + text TEXT, + embeddings TEXT, + type TEXT, + system TEXT, + layout_width REAL, + layout_height REAL, + points TEXT, + url TEXT, + version TEXT, + date_created TEXT, + date_modified TEXT, + date_processed TEXT, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id TEXT, + attached_filename TEXT, + filetype TEXT, + last_modified TEXT, + file_directory TEXT, + filename TEXT, + languages TEXT, + page_number TEXT, + links TEXT, + page_name TEXT, + link_urls TEXT, + link_texts TEXT, + sent_from TEXT, + sent_to TEXT, + subject TEXT, + section TEXT, + header_footer_type TEXT, + emphasized_text_contents TEXT, + emphasized_text_tags TEXT, + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); diff --git a/scripts/sql-test-helpers/docker-compose-pgvector.yaml b/scripts/sql-test-helpers/docker-compose-pgvector.yaml new file mode 100644 index 000000000..37f7f7a78 --- /dev/null +++ b/scripts/sql-test-helpers/docker-compose-pgvector.yaml @@ -0,0 +1,13 @@ +services: + pgvector: + image: ankane/pgvector + restart: always + container_name: pgvector_dest + ports: + - 5433:5432 + environment: + POSTGRES_DB: elements + POSTGRES_USER: unstructured + POSTGRES_PASSWORD: test + volumes: + - ./create-pgvector-schema.sql:/docker-entrypoint-initdb.d/init.sql diff --git a/test_unstructured_ingest/dest/pgvector.sh b/test_unstructured_ingest/dest/pgvector.sh new file mode 100755 index 000000000..66f6aa5bd --- /dev/null +++ b/test_unstructured_ingest/dest/pgvector.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +set -e + +SRC_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$SRC_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=sql-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} +CI=${CI:-"false"} +DATABASE_TYPE="pgvector" + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + echo "Stopping SQL DB Docker container" + docker-compose -f scripts/sql-test-helpers/docker-compose-"$DATABASE_TYPE".yaml down --remove-orphans -v + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" + if [ "$CI" == "true" ]; then + cleanup_dir "$DOWNLOAD_DIR" + fi +} + +trap cleanup EXIT + +# Create sql instance and create `elements` class +echo "Creating SQL DB instance" +# shellcheck source=/dev/null +scripts/sql-test-helpers/create-sql-instance.sh "$DATABASE_TYPE" +wait + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + --embedding-provider "langchain-huggingface" \ + sql \ + --db-type "postgresql" \ + --username unstructured \ + --password test \ + --host localhost \ + --port 5433 \ + --database elements + +"$SCRIPT_DIR"/python/test-ingest-sql-output.py "$DATABASE_TYPE" "5433" diff --git a/test_unstructured_ingest/dest/sqlite.sh b/test_unstructured_ingest/dest/sqlite.sh new file mode 100755 index 000000000..c289bf421 --- /dev/null +++ b/test_unstructured_ingest/dest/sqlite.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +set -e + +SRC_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$SRC_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=sql-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} +CI=${CI:-"false"} +DATABASE_TYPE="sqlite" +DB_PATH=$SCRIPT_DIR/elements.db + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" + rm -rf "$DB_PATH" + if [ "$CI" == "true" ]; then + cleanup_dir "$DOWNLOAD_DIR" + + fi +} + +trap cleanup EXIT + +# Create sql instance and create `elements` class +echo "Creating SQL DB instance" +# shellcheck source=/dev/null +scripts/sql-test-helpers/create-sql-instance.sh "$DATABASE_TYPE" "$DB_PATH" +wait + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + sql \ + --db-type "$DATABASE_TYPE" \ + --username unstructured \ + --database "$DB_PATH" + +"$SCRIPT_DIR"/python/test-ingest-sql-output.py "$DATABASE_TYPE" "$DB_PATH" diff --git a/test_unstructured_ingest/python/test-ingest-sql-output.py b/test_unstructured_ingest/python/test-ingest-sql-output.py new file mode 100755 index 000000000..fbf511725 --- /dev/null +++ b/test_unstructured_ingest/python/test-ingest-sql-output.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +import sys + +N_ELEMENTS = 5 + + +def create_connection(db_type, database=None, port=None): + if db_type == "pgvector": + from psycopg2 import connect + + return connect( + user="unstructured", + password="test", + dbname="elements", + host="localhost", + port=port, + ) + elif db_type == "sqlite": + from sqlite3 import connect + + return connect(database=database) + raise ValueError(f"Unsupported database {db_type} connection.") + + +if __name__ == "__main__": + database_name = sys.argv[1] + db_url = None + port = None + if database_name == "sqlite": + db_url = sys.argv[2] + else: + port = sys.argv[2] + + print(f"Running SQL output test for: {database_name}") + conn = create_connection(database_name, db_url, port) + query = "select count(*) from elements;" + cursor = conn.cursor() + cursor.execute(query) + count = cursor.fetchone()[0] + + if database_name == "pgvector": + """Get embedding from database and then use it to + search for the closest vector (which should be itself)""" + cursor = conn.cursor() + cursor.execute("SELECT embeddings FROM elements order by text limit 1") + test_embedding = cursor.fetchone()[0] + similarity_query = ( + f"SELECT text FROM elements ORDER BY embeddings <-> '{test_embedding}' LIMIT 1;" + ) + cursor.execute(similarity_query) + res = cursor.fetchone() + assert res[0] == "Best Regards," + print("Result of vector search against pgvector with embeddings successful") + + try: + assert count == N_ELEMENTS + except AssertionError: + print(f"{database_name} dest check failed: got {count}, expected {N_ELEMENTS}") + raise + finally: + cursor.close() + conn.close() + + print(f"SUCCESS: {database_name} dest check") diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index 748d44dd8..ad8d3c9ee 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -24,11 +24,13 @@ all_tests=( 'elasticsearch.sh' 'gcs.sh' 'mongodb.sh' + 'pgvector.sh' 'pinecone.sh' 'qdrant.sh' 's3.sh' - 'weaviate.sh' 'sharepoint-embed-cog-index.sh' + 'sqlite.sh' + 'weaviate.sh' ) full_python_matrix_tests=( diff --git a/test_unstructured_ingest/unit/test_interfaces.py b/test_unstructured_ingest/unit/test_interfaces.py index 2286ccbbf..6d8a51303 100644 --- a/test_unstructured_ingest/unit/test_interfaces.py +++ b/test_unstructured_ingest/unit/test_interfaces.py @@ -6,11 +6,9 @@ from typing import Any, Dict import pytest from unstructured.documents.elements import DataSourceMetadata -from unstructured.ingest.connector.fsspec.sftp import SftpAccessConfig, SimpleSftpConfig from unstructured.ingest.interfaces import ( BaseConnectorConfig, BaseSingleIngestDoc, - FsspecConfig, PartitionConfig, ProcessorConfig, ReadConfig, @@ -257,91 +255,3 @@ def test_process_file_flatten_metadata(mocker, partition_test_results): expected_keys = {"element_id", "text", "type", "filename", "file_directory", "filetype"} for elem in isd_elems: assert expected_keys == set(elem.keys()) - - -def test_post_init_invalid_protocol(): - """Validate that an invalid protocol raises a ValueError""" - with pytest.raises(ValueError): - FsspecConfig(remote_url="ftp://example.com/path/to/file.txt") - - -def test_fsspec_path_extraction_dropbox_root(): - """Validate that the path extraction works for dropbox root""" - config = FsspecConfig(remote_url="dropbox:// /") - assert config.protocol == "dropbox" - assert config.path_without_protocol == " /" - assert config.dir_path == " " - assert config.file_path == "" - - -def test_fsspec_path_extraction_dropbox_subfolder(): - """Validate that the path extraction works for dropbox subfolder""" - config = FsspecConfig(remote_url="dropbox://path") - assert config.protocol == "dropbox" - assert config.path_without_protocol == "path" - assert config.dir_path == "path" - assert config.file_path == "" - - -def test_fsspec_path_extraction_s3_bucket_only(): - """Validate that the path extraction works for s3 bucket without filename""" - config = FsspecConfig(remote_url="s3://bucket-name") - assert config.protocol == "s3" - assert config.path_without_protocol == "bucket-name" - assert config.dir_path == "bucket-name" - assert config.file_path == "" - - -def test_fsspec_path_extraction_s3_valid_path(): - """Validate that the path extraction works for s3 bucket with filename""" - config = FsspecConfig(remote_url="s3://bucket-name/path/to/file.txt") - assert config.protocol == "s3" - assert config.path_without_protocol == "bucket-name/path/to/file.txt" - assert config.dir_path == "bucket-name" - assert config.file_path == "path/to/file.txt" - - -def test_fsspec_path_extraction_s3_invalid_path(): - """Validate that an invalid s3 path (that mimics triple slash for dropbox) - raises a ValueError""" - with pytest.raises(ValueError): - FsspecConfig(remote_url="s3:///bucket-name/path/to") - - -def test_sftp_path_extraction_post_init_with_extension(): - """Validate that the path extraction works for sftp with file extension""" - config = SimpleSftpConfig( - remote_url="sftp://example.com/path/to/file.txt", - access_config=SftpAccessConfig(username="username", password="password", host="", port=22), - ) - assert config.file_path == "file.txt" - assert config.dir_path == "path/to" - assert config.path_without_protocol == "path/to" - assert config.access_config.host == "example.com" - assert config.access_config.port == 22 - - -def test_sftp_path_extraction_without_extension(): - """Validate that the path extraction works for sftp without extension""" - config = SimpleSftpConfig( - remote_url="sftp://example.com/path/to/directory", - access_config=SftpAccessConfig(username="username", password="password", host="", port=22), - ) - assert config.file_path == "" - assert config.dir_path == "path/to/directory" - assert config.path_without_protocol == "path/to/directory" - assert config.access_config.host == "example.com" - assert config.access_config.port == 22 - - -def test_sftp_path_extraction_with_port(): - """Validate that the path extraction works for sftp with a non-default port""" - config = SimpleSftpConfig( - remote_url="sftp://example.com:47474/path/to/file.txt", - access_config=SftpAccessConfig(username="username", password="password", host="", port=22), - ) - assert config.file_path == "file.txt" - assert config.dir_path == "path/to" - assert config.path_without_protocol == "path/to" - assert config.access_config.host == "example.com" - assert config.access_config.port == 47474 diff --git a/test_unstructured_ingest/unit/test_paths.py b/test_unstructured_ingest/unit/test_paths.py index 73915b30c..de3648914 100644 --- a/test_unstructured_ingest/unit/test_paths.py +++ b/test_unstructured_ingest/unit/test_paths.py @@ -1,12 +1,18 @@ from dataclasses import dataclass from pathlib import Path +import pytest + from unstructured.ingest.connector.fsspec.dropbox import ( DropboxIngestDoc, ) from unstructured.ingest.connector.fsspec.fsspec import ( FsspecIngestDoc, ) +from unstructured.ingest.connector.fsspec.sftp import SftpAccessConfig, SimpleSftpConfig +from unstructured.ingest.interfaces import ( + FsspecConfig, +) @dataclass @@ -127,3 +133,91 @@ def test_fsspec_folder_fails(): assert output_filename == Path("/fake_file2.txt.json") assert download_filename == Path("/fake_file2.txt") + + +def test_post_init_invalid_protocol(): + """Validate that an invalid protocol raises a ValueError""" + with pytest.raises(ValueError): + FsspecConfig(remote_url="ftp://example.com/path/to/file.txt") + + +def test_fsspec_path_extraction_dropbox_root(): + """Validate that the path extraction works for dropbox root""" + config = FsspecConfig(remote_url="dropbox:// /") + assert config.protocol == "dropbox" + assert config.path_without_protocol == " /" + assert config.dir_path == " " + assert config.file_path == "" + + +def test_fsspec_path_extraction_dropbox_subfolder(): + """Validate that the path extraction works for dropbox subfolder""" + config = FsspecConfig(remote_url="dropbox://path") + assert config.protocol == "dropbox" + assert config.path_without_protocol == "path" + assert config.dir_path == "path" + assert config.file_path == "" + + +def test_fsspec_path_extraction_s3_bucket_only(): + """Validate that the path extraction works for s3 bucket without filename""" + config = FsspecConfig(remote_url="s3://bucket-name") + assert config.protocol == "s3" + assert config.path_without_protocol == "bucket-name" + assert config.dir_path == "bucket-name" + assert config.file_path == "" + + +def test_fsspec_path_extraction_s3_valid_path(): + """Validate that the path extraction works for s3 bucket with filename""" + config = FsspecConfig(remote_url="s3://bucket-name/path/to/file.txt") + assert config.protocol == "s3" + assert config.path_without_protocol == "bucket-name/path/to/file.txt" + assert config.dir_path == "bucket-name" + assert config.file_path == "path/to/file.txt" + + +def test_fsspec_path_extraction_s3_invalid_path(): + """Validate that an invalid s3 path (that mimics triple slash for dropbox) + raises a ValueError""" + with pytest.raises(ValueError): + FsspecConfig(remote_url="s3:///bucket-name/path/to") + + +def test_sftp_path_extraction_post_init_with_extension(): + """Validate that the path extraction works for sftp with file extension""" + config = SimpleSftpConfig( + remote_url="sftp://example.com/path/to/file.txt", + access_config=SftpAccessConfig(username="username", password="password", host="", port=22), + ) + assert config.file_path == "file.txt" + assert config.dir_path == "path/to" + assert config.path_without_protocol == "path/to" + assert config.access_config.host == "example.com" + assert config.access_config.port == 22 + + +def test_sftp_path_extraction_without_extension(): + """Validate that the path extraction works for sftp without extension""" + config = SimpleSftpConfig( + remote_url="sftp://example.com/path/to/directory", + access_config=SftpAccessConfig(username="username", password="password", host="", port=22), + ) + assert config.file_path == "" + assert config.dir_path == "path/to/directory" + assert config.path_without_protocol == "path/to/directory" + assert config.access_config.host == "example.com" + assert config.access_config.port == 22 + + +def test_sftp_path_extraction_with_port(): + """Validate that the path extraction works for sftp with a non-default port""" + config = SimpleSftpConfig( + remote_url="sftp://example.com:47474/path/to/file.txt", + access_config=SftpAccessConfig(username="username", password="password", host="", port=22), + ) + assert config.file_path == "file.txt" + assert config.dir_path == "path/to" + assert config.path_without_protocol == "path/to" + assert config.access_config.host == "example.com" + assert config.access_config.port == 47474 diff --git a/test_unstructured_ingest/unit/test_sql_conform_dict.py b/test_unstructured_ingest/unit/test_sql_conform_dict.py new file mode 100644 index 000000000..0aad7718f --- /dev/null +++ b/test_unstructured_ingest/unit/test_sql_conform_dict.py @@ -0,0 +1,125 @@ +import datetime +from unittest.mock import Mock, patch + +from unstructured.ingest.connector.sql import SqlDestinationConnector + +TEST_DATA_1 = { + "element_id": "80803034fe04181c163306740700cc54", + "metadata": { + "coordinates": { + "layout_height": 792, + "layout_width": 612, + "points": [ + [72.0, 72.69200000000001], + [72.0, 83.69200000000001], + [135.8, 83.69200000000001], + [135.8, 72.69200000000001], + ], + "system": "PixelSpace", + }, + "data_source": { + "date_created": "2023-10-25 10:05:44.976775", + "date_modified": "2023-10-25 10:05:44.976775", + "date_processed": "2023-12-14T17:06:33.074057", + "permissions_data": [{"mode": 33188}], + "url": "example-docs/fake-memo.pdf", + }, + "file_directory": "example-docs", + "filename": "fake-memo.pdf", + "filetype": "application/pdf", + "languages": ["eng"], + "last_modified": "2023-10-25T10:05:44", + "page_number": 1, + }, + "text": "May 5, 2023", + "type": "UncategorizedText", + "embeddings": [ + -0.05623878538608551, + 0.008579030632972717, + 0.03698136284947395, + -0.01745658740401268, + -0.030465232208371162, + 0.00996527448296547, + ], +} + +TEST_DATA_2 = { + "metadata": { + "coordinates": {"points": [1, 2, 3]}, + "links": {"link1": "https://example.com", "link2": "https://example.org"}, + "data_source": { + "date_created": "2021-01-01T00:00:00", + "date_modified": "2021-01-02T00:00:00", + "date_processed": "2022-12-13T15:44:08", + "version": 1.1, + }, + "last_modified": "2021-01-03T00:00:00", + "page_number": 10, + "regex_metadata": {"pattern": "abc"}, + }, + "embeddings": [0.1, 0.2, 0.3], +} + + +def test_conform_dict_1(): + """Validate that the conform_dict method returns the expected output for a real example""" + # Create a mock instance of the connector class + connector = SqlDestinationConnector(write_config=Mock(), connector_config=Mock()) + + # Mock the uuid.uuid4 function to return a fixed value + with patch("uuid.uuid4", return_value="mocked_uuid"): + # Call the conform_dict method + data_out = connector.conform_dict(TEST_DATA_1) + + # Assert that the result matches the expected output + assert data_out == { + "element_id": "80803034fe04181c163306740700cc54", + "text": "May 5, 2023", + "type": "UncategorizedText", + "id": "mocked_uuid", + "file_directory": "example-docs", + "filename": "fake-memo.pdf", + "filetype": "application/pdf", + "languages": ["eng"], + "last_modified": datetime.datetime(2023, 10, 25, 10, 5, 44), + "page_number": "1", + "date_created": datetime.datetime(2023, 10, 25, 10, 5, 44, 976775), + "date_modified": datetime.datetime(2023, 10, 25, 10, 5, 44, 976775), + "date_processed": datetime.datetime(2023, 12, 14, 17, 6, 33, 74057), + "permissions_data": '[{"mode": 33188}]', + "url": "example-docs/fake-memo.pdf", + "layout_height": 792, + "layout_width": 612, + "points": "[[72.0, 72.69200000000001], [72.0, 83.69200000000001]," + " [135.8, 83.69200000000001], [135.8, 72.69200000000001]]", + "system": "PixelSpace", + "embeddings": "[-0.05623878538608551, 0.008579030632972717, " + "0.03698136284947395, -0.01745658740401268, " + "-0.030465232208371162, 0.00996527448296547]", + } + + +def test_conform_dict_2(): + """Validate that the conform_dict method returns the expected output for a simplified example""" + # Create a mock instance of the connector class + connector = SqlDestinationConnector(write_config=Mock(), connector_config=Mock()) + + # Mock the uuid.uuid4 function to return a fixed value + with patch("uuid.uuid4", return_value="mocked_uuid"): + # Call the conform_dict method + data_out = connector.conform_dict(TEST_DATA_2) + + # Assert that the result matches the expected output + assert data_out == { + "embeddings": "[0.1, 0.2, 0.3]", + "id": "mocked_uuid", + "links": '{"link1": "https://example.com", "link2": "https://example.org"}', + "last_modified": datetime.datetime(2021, 1, 3, 0, 0), + "page_number": "10", + "regex_metadata": '{"pattern": "abc"}', + "date_created": datetime.datetime(2021, 1, 1, 0, 0), + "date_modified": datetime.datetime(2021, 1, 2, 0, 0), + "date_processed": datetime.datetime(2022, 12, 13, 15, 44, 8), + "version": "1.1", + "points": "[1, 2, 3]", + } diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 0d1529cc6..cb2abecf2 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.11.9-dev0" # pragma: no cover +__version__ = "0.11.9-dev1" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 3b076d56d..bd14f32f3 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -44,6 +44,7 @@ from .reddit import get_base_src_cmd as reddit_base_src_cmd from .salesforce import get_base_src_cmd as salesforce_base_src_cmd from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd from .slack import get_base_src_cmd as slack_base_src_cmd +from .sql import get_base_dest_cmd as sql_base_dest_cmd from .weaviate import get_base_dest_cmd as weaviate_dest_cmd from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd @@ -101,6 +102,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [ s3_base_dest_cmd, azure_cognitive_search_base_dest_cmd, delta_table_dest_cmd, + sql_base_dest_cmd, weaviate_dest_cmd, mongo_base_dest_cmd, pinecone_base_dest_cmd, diff --git a/unstructured/ingest/cli/cmds/sql.py b/unstructured/ingest/cli/cmds/sql.py new file mode 100644 index 000000000..7b4800e55 --- /dev/null +++ b/unstructured/ingest/cli/cmds/sql.py @@ -0,0 +1,66 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import CliConfig +from unstructured.ingest.connector.sql import SimpleSqlConfig +from unstructured.ingest.interfaces import WriteConfig + +SQL_DRIVERS = {"postgresql", "sqlite"} + + +@dataclass +class SqlCliConfig(SimpleSqlConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--db-type"], + required=True, + type=click.Choice(SQL_DRIVERS), + help="Type of the database backend", + ), + click.Option( + ["--username"], + default=None, + type=str, + help="DB username", + ), + click.Option( + ["--password"], + default=None, + type=str, + help="DB password", + ), + click.Option( + ["--host"], + default=None, + type=str, + help="DB host", + ), + click.Option( + ["--port"], + default=None, + type=int, + help="DB host connection port", + ), + click.Option( + ["--database"], + default=None, + type=str, + help="Database name. For sqlite databases, this is the path to the .db file.", + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name="sql", + cli_config=SqlCliConfig, + write_config=WriteConfig, + ) + return cmd_cls diff --git a/unstructured/ingest/connector/sql.py b/unstructured/ingest/connector/sql.py new file mode 100644 index 000000000..053ed531a --- /dev/null +++ b/unstructured/ingest/connector/sql.py @@ -0,0 +1,201 @@ +import json +import typing as t +import uuid +from dataclasses import dataclass, field + +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.error import DestinationConnectionError +from unstructured.ingest.interfaces import ( + AccessConfig, + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +ELEMENTS_TABLE_NAME = "elements" + + +@dataclass +class SqlAccessConfig(AccessConfig): + username: t.Optional[str] + password: t.Optional[str] = enhanced_field(sensitive=True) + + +@dataclass +class SimpleSqlConfig(BaseConnectorConfig): + db_type: t.Optional[str] + host: t.Optional[str] + database: t.Optional[str] + port: t.Optional[int] + access_config: SqlAccessConfig + + def __post_init__(self): + if (self.db_type == "sqlite") and (self.database is None): + raise ValueError( + "A sqlite connection requires a path to a *.db file " + "through the `database` argument" + ) + + @property + def connection(self): + if self.db_type == "postgresql": + return self._make_psycopg_connection + elif self.db_type == "sqlite": + return self._make_sqlite_connection + raise ValueError(f"Unsupported database {self.db_type} connection.") + + def _make_sqlite_connection(self): + from sqlite3 import connect + + return connect(database=self.database) + + @requires_dependencies(["psycopg2"], extras="postgresql") + def _make_psycopg_connection(self): + from psycopg2 import connect + + return connect( + user=self.access_config.username, + password=self.access_config.password, + dbname=self.database, + host=self.host, + port=self.port, + ) + + +@dataclass +class SqlDestinationConnector(BaseDestinationConnector): + connector_config: SimpleSqlConfig + _client: t.Optional[t.Any] = field(init=False, default=None) + + @property + def client(self): + if self._client is None: + self._client = self.connector_config.connection() + return self._client + + @DestinationConnectionError.wrap + def initialize(self): + _ = self.client + + def check_connection(self): + cursor = self.client.cursor() + cursor.execute("SELECT 1;") + cursor.close() + + def conform_dict(self, data: dict) -> tuple: + """ + Updates the element dictionary to conform to the sql schema + """ + from datetime import datetime + + data["id"] = str(uuid.uuid4()) + + # Dict as string formatting + if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"): + # Explicit casting otherwise fails schema type checking + data["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator)) + + # Array of items as string formatting + if (embeddings := data.get("embeddings")) and ( + self.connector_config.db_type != "postgresql" + ): + data["embeddings"] = str(json.dumps(embeddings)) + + if points := data.get("metadata", {}).get("coordinates", {}).get("points"): + data["metadata"]["coordinates"]["points"] = str(json.dumps(points)) + + if links := data.get("metadata", {}).get("links", {}): + data["metadata"]["links"] = str(json.dumps(links)) + + if permissions_data := ( + data.get("metadata", {}).get("data_source", {}).get("permissions_data") + ): + data["metadata"]["data_source"]["permissions_data"] = json.dumps(permissions_data) + + if link_texts := data.get("metadata", {}).get("link_texts", {}): + data["metadata"]["link_texts"] = str(json.dumps(link_texts)) + + if sent_from := data.get("metadata", {}).get("sent_from", {}): + data["metadata"]["sent_from"] = str(json.dumps(sent_from)) + + if sent_to := data.get("metadata", {}).get("sent_to", {}): + data["metadata"]["sent_to"] = str(json.dumps(sent_to)) + + # Datetime formatting + if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"): + data["metadata"]["data_source"]["date_created"] = datetime.fromisoformat(date_created) + + if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"): + data["metadata"]["data_source"]["date_modified"] = datetime.fromisoformat(date_modified) + + if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"): + data["metadata"]["data_source"]["date_processed"] = datetime.fromisoformat( + date_processed + ) + + if last_modified := data.get("metadata", {}).get("last_modified", {}): + data["metadata"]["last_modified"] = datetime.fromisoformat(last_modified) + + # String casting + if version := data.get("metadata", {}).get("data_source", {}).get("version"): + data["metadata"]["data_source"]["version"] = str(version) + + if page_number := data.get("metadata", {}).get("page_number"): + data["metadata"]["page_number"] = str(page_number) + + if regex_metadata := data.get("metadata", {}).get("regex_metadata"): + data["metadata"]["regex_metadata"] = str(json.dumps(regex_metadata)) + + if data.get("metadata", {}).get("data_source", None): + data.update(data.get("metadata", {}).pop("data_source", None)) + if data.get("metadata", {}).get("coordinates", None): + data.update(data.get("metadata", {}).pop("coordinates", None)) + if data.get("metadata", {}): + data.update(data.pop("metadata", None)) + + return data + + @DestinationConnectionError.wrap + def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"writing {len(json_list)} objects to database {self.connector_config.database} " + f"at {self.connector_config.host}" + ) + + with self.client as conn: + cursor = conn.cursor() + + # Since we have no guarantee that each element will have the same keys + # we insert each element individually + for e in json_list: + elem = self.conform_dict(e) + + query = f"INSERT INTO {ELEMENTS_TABLE_NAME} ({','.join(elem.keys())}) \ + VALUES({','.join(['?' if self.connector_config.db_type=='sqlite' else '%s' for x in elem])})" # noqa E501 + values = [] + for v in elem.values(): + if self.connector_config.db_type == "sqlite" and isinstance(v, list): + values.append(json.dumps(v)) + else: + values.append(v) + cursor.execute(query, values) + + conn.commit() + cursor.close() + + # Leaving contexts doesn't close the connection, so doing it here + conn.close() + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + json_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + json_content = json.load(json_file) + logger.info( + f"appending {len(json_content)} json elements from content in {local_path}", + ) + json_list.extend(json_content) + self.write_dict(json_list=json_list) diff --git a/unstructured/ingest/interfaces.py b/unstructured/ingest/interfaces.py index 8d97594f6..105a85d93 100644 --- a/unstructured/ingest/interfaces.py +++ b/unstructured/ingest/interfaces.py @@ -52,8 +52,8 @@ class BaseConfig(EnhancedDataClassJsonMixin, ABC): @dataclass class AccessConfig(BaseConfig): - # Meant to designate holding any sensitive information associated with other configs - pass + """Meant to designate holding any sensitive information associated with other configs + and also for access specific configs.""" @dataclass diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index 9c3b3391f..be39a9076 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -13,6 +13,7 @@ from .fsspec.s3 import S3Writer from .mongodb import MongodbWriter from .pinecone import PineconeWriter from .qdrant import QdrantWriter +from .sql import SqlWriter from .weaviate import WeaviateWriter writer_map: t.Dict[str, t.Type[Writer]] = { @@ -28,6 +29,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = { "pinecone": PineconeWriter, "qdrant": QdrantWriter, "s3": S3Writer, + "sql": SqlWriter, "weaviate": WeaviateWriter, } diff --git a/unstructured/ingest/runner/writers/sql.py b/unstructured/ingest/runner/writers/sql.py new file mode 100644 index 000000000..70c710a1f --- /dev/null +++ b/unstructured/ingest/runner/writers/sql.py @@ -0,0 +1,22 @@ +import typing as t +from dataclasses import dataclass + +from unstructured.ingest.interfaces import BaseDestinationConnector +from unstructured.ingest.runner.writers.base_writer import Writer + +if t.TYPE_CHECKING: + from unstructured.ingest.connector.sql import SimpleSqlConfig + from unstructured.ingest.interfaces import WriteConfig + + +@dataclass +class SqlWriter(Writer): + write_config: "WriteConfig" + connector_config: "SimpleSqlConfig" + + def get_connector_cls(self) -> t.Type[BaseDestinationConnector]: + from unstructured.ingest.connector.sql import ( + SqlDestinationConnector, + ) + + return SqlDestinationConnector