feat: adds postgresql/sqlite destination connector (#2005)

- Adds a destination connector to upload processed output into a
PostgreSQL/Sqlite database instance.
- Users are responsible to provide their instances. This PR includes a
couple of configuration examples.
- Defines the scripts required to setup a PostgreSQL instance with the
unstructured elements schema.
- Validates postgres/pgvector embedding storage and retrieval

---------

Co-authored-by: potter-potter <david.potter@gmail.com>
This commit is contained in:
rvztz 2024-01-04 13:33:16 -06:00 committed by GitHub
parent 5b0ae3fd8b
commit 950e5d68f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1099 additions and 97 deletions

View File

@ -1,4 +1,4 @@
## 0.11.9-dev0
## 0.11.9-dev1
### Enhancements
@ -6,6 +6,8 @@
### Features
* **Add PostgreSQL/SQLite destination connector** PostgreSQL and SQLite connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data to a PostgreSQL or SQLite database. And write embeddings to PostgreSQL pgvector database.
### Fixes
## 0.11.8
@ -16,10 +18,8 @@
### Features
### Fixes
## 0.11.7
### Enhancements

View File

@ -231,6 +231,10 @@ install-ingest-qdrant:
install-ingest-chroma:
python3 -m pip install -r requirements/ingest/chroma.txt
.PHONY: install-ingest-postgres
install-ingest-postgres:
python3 -m pip install -r requirements/ingest-postgres.txt
.PHONY: install-embed-huggingface
install-embed-huggingface:
python3 -m pip install -r requirements/ingest/embed-huggingface.txt

View File

@ -19,5 +19,6 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/pinecone
destination_connectors/qdrant
destination_connectors/s3
destination_connectors/sql
destination_connectors/weaviate

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
unstructured-ingest \
local \
--input-path example-docs/fake-memo.pdf \
--anonymous \
--output-dir local-output-to-mongo \
--num-processes 2 \
--verbose \
--strategy fast \
sql \
--db-type postgresql \
--username postgres \
--password test \
--host localhost \
--port 5432 \
--database elements

View File

@ -0,0 +1,27 @@
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-postgres",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
writer_type="sql",
writer_kwargs={
"db_type": os.getenv("DB_TYPE"),
"username": os.getenv("USERNAME"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST"),
"port": os.getenv("DB_PORT"),
"database": os.getenv("DB_DATABASE"),
},
)
runner.run(
input_path="example-docs/fake-memo.pdf",
)

View File

@ -0,0 +1,44 @@
CREATE EXTENSION vector;
CREATE TABLE elements (
id UUID PRIMARY KEY,
element_id VARCHAR,
text TEXT,
embeddings vector(384),
type VARCHAR,
system VARCHAR,
layout_width DECIMAL,
layout_height DECIMAL,
points TEXT,
url TEXT,
version VARCHAR,
date_created TIMESTAMPTZ,
date_modified TIMESTAMPTZ,
date_processed TIMESTAMPTZ,
permissions_data TEXT,
record_locator TEXT,
category_depth INTEGER,
parent_id VARCHAR,
attached_filename VARCHAR,
filetype VARCHAR,
last_modified TIMESTAMPTZ,
file_directory VARCHAR,
filename VARCHAR,
languages VARCHAR [],
page_number VARCHAR,
links TEXT,
page_name VARCHAR,
link_urls VARCHAR [],
link_texts VARCHAR [],
sent_from VARCHAR [],
sent_to VARCHAR [],
subject VARCHAR,
section VARCHAR,
header_footer_type VARCHAR,
emphasized_text_contents VARCHAR [],
emphasized_text_tags VARCHAR [],
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL
);

View File

@ -0,0 +1,42 @@
CREATE TABLE elements (
id UUID PRIMARY KEY,
element_id VARCHAR,
text TEXT,
embeddings DECIMAL [],
type VARCHAR,
system VARCHAR,
layout_width DECIMAL,
layout_height DECIMAL,
points TEXT,
url TEXT,
version VARCHAR,
date_created TIMESTAMPTZ,
date_modified TIMESTAMPTZ,
date_processed TIMESTAMPTZ,
permissions_data TEXT,
record_locator TEXT,
category_depth INTEGER,
parent_id VARCHAR,
attached_filename VARCHAR,
filetype VARCHAR,
last_modified TIMESTAMPTZ,
file_directory VARCHAR,
filename VARCHAR,
languages VARCHAR [],
page_number VARCHAR,
links TEXT,
page_name VARCHAR,
link_urls VARCHAR [],
link_texts VARCHAR [],
sent_from VARCHAR [],
sent_to VARCHAR [],
subject VARCHAR,
section VARCHAR,
header_footer_type VARCHAR,
emphasized_text_contents VARCHAR [],
emphasized_text_tags VARCHAR [],
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL
);

View File

@ -0,0 +1,41 @@
CREATE TABLE elements (
id TEXT PRIMARY KEY,
element_id TEXT,
text TEXT,
embeddings TEXT,
type TEXT,
system TEXT,
layout_width REAL,
layout_height REAL,
points TEXT,
url TEXT,
version TEXT,
date_created TEXT,
date_modified TEXT,
date_processed TEXT,
permissions_data TEXT,
record_locator TEXT,
category_depth INTEGER,
parent_id TEXT,
attached_filename TEXT,
filetype TEXT,
last_modified TEXT,
file_directory TEXT,
filename TEXT,
languages TEXT,
page_number TEXT,
links TEXT,
page_name TEXT,
link_urls TEXT,
link_texts TEXT,
sent_from TEXT,
sent_to TEXT,
subject TEXT,
section TEXT,
header_footer_type TEXT,
emphasized_text_contents TEXT,
emphasized_text_tags TEXT,
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL
);

View File

@ -0,0 +1,62 @@
SQL
===========
NOTE: At the moment, the connector only supports PostgreSQL and SQLite. Vectors can be stored and searched in PostgreSQL with pgvector.
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a PostgreSQL or SQLite schema.
Insert query is currently limited to append.
First you'll need to install the sql dependencies as shown here if you are using PostgreSQL.
.. code:: shell
pip install "unstructured[postgres]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
.. tabs::
.. tab:: Shell
.. literalinclude:: ./code/bash/sql.sh
:language: bash
.. tab:: Python
.. literalinclude:: ./code/python/sql.py
:language: python
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> sql --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.
Sample Index Schema
-------------------
To make sure the schema of the index matches the data being written to it, a sample schema json can be used.
.. tabs::
.. tab:: PostgreSQL
:
.. literalinclude:: ./data/postgres-schema.sql
:language: sql
:linenos:
:caption: Object description
.. tab:: PostgreSQL with pgvector
.. literalinclude:: ./data/pgvector-schema.sql
:language: sql
:linenos:
:caption: Object description
.. tab:: Sqlite
.. literalinclude:: ./data/sqlite-schema.sql
:language: sql
:linenos:
:caption: Object description

26
examples/ingest/sql/ingest.sh Executable file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Uploads the structured output of the files within the given S3 path.
# Structured outputs are stored in a PostgreSQL instance/
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-to-pinecone \
--strategy fast \
--chunk-elements \
--embedding-provider "<an unstructured embedding provider, ie. langchain-huggingface>" \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
sql \
--db-type postgresql \
--username postgres \
--password test \
--host localhost \
--port 5432 \
--database elements

View File

@ -0,0 +1,3 @@
-c ../constraints.in
-c ../base.txt
psycopg2-binary

View File

@ -0,0 +1,8 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --constraint=requirements/constraints.in requirements/ingest/postgres.in
#
psycopg2-binary==2.9.9
# via -r requirements/ingest/sql.in

View File

@ -0,0 +1,44 @@
CREATE EXTENSION vector;
CREATE TABLE elements (
id UUID PRIMARY KEY,
element_id VARCHAR,
text TEXT,
embeddings vector(384),
type VARCHAR,
system VARCHAR,
layout_width DECIMAL,
layout_height DECIMAL,
points TEXT,
url TEXT,
version VARCHAR,
date_created TIMESTAMPTZ,
date_modified TIMESTAMPTZ,
date_processed TIMESTAMPTZ,
permissions_data TEXT,
record_locator TEXT,
category_depth INTEGER,
parent_id VARCHAR,
attached_filename VARCHAR,
filetype VARCHAR,
last_modified TIMESTAMPTZ,
file_directory VARCHAR,
filename VARCHAR,
languages VARCHAR [],
page_number VARCHAR,
links TEXT,
page_name VARCHAR,
link_urls VARCHAR [],
link_texts VARCHAR [],
sent_from VARCHAR [],
sent_to VARCHAR [],
subject VARCHAR,
section VARCHAR,
header_footer_type VARCHAR,
emphasized_text_contents VARCHAR [],
emphasized_text_tags VARCHAR [],
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL
);

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$(dirname "$(realpath "$0")")
DATABASE_NAME=$1
DATABASE_FILE_PATH=$2
# Create the SQL instance
if [[ "$DATABASE_NAME" != "sqlite" ]]; then
docker compose version
docker compose -f "$SCRIPT_DIR"/docker-compose-"$DATABASE_NAME".yaml up --wait
docker compose -f "$SCRIPT_DIR"/docker-compose-"$DATABASE_NAME".yaml ps
else
touch "$DATABASE_FILE_PATH"
python "$SCRIPT_DIR"/create-sqlite-schema.py "$DATABASE_FILE_PATH"
fi
echo "$DATABASE_NAME instance is live."

View File

@ -0,0 +1,14 @@
import sqlite3
import sys
from pathlib import Path
if __name__ == "__main__":
connection = sqlite3.connect(database=sys.argv[1])
query = None
script_path = (Path(__file__).parent / Path("create-sqlite-schema.sql")).resolve()
with open(script_path) as f:
query = f.read()
cursor = connection.cursor()
cursor.executescript(query)
connection.close()

View File

@ -0,0 +1,41 @@
CREATE TABLE elements (
id TEXT PRIMARY KEY,
element_id TEXT,
text TEXT,
embeddings TEXT,
type TEXT,
system TEXT,
layout_width REAL,
layout_height REAL,
points TEXT,
url TEXT,
version TEXT,
date_created TEXT,
date_modified TEXT,
date_processed TEXT,
permissions_data TEXT,
record_locator TEXT,
category_depth INTEGER,
parent_id TEXT,
attached_filename TEXT,
filetype TEXT,
last_modified TEXT,
file_directory TEXT,
filename TEXT,
languages TEXT,
page_number TEXT,
links TEXT,
page_name TEXT,
link_urls TEXT,
link_texts TEXT,
sent_from TEXT,
sent_to TEXT,
subject TEXT,
section TEXT,
header_footer_type TEXT,
emphasized_text_contents TEXT,
emphasized_text_tags TEXT,
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL
);

View File

@ -0,0 +1,13 @@
services:
pgvector:
image: ankane/pgvector
restart: always
container_name: pgvector_dest
ports:
- 5433:5432
environment:
POSTGRES_DB: elements
POSTGRES_USER: unstructured
POSTGRES_PASSWORD: test
volumes:
- ./create-pgvector-schema.sql:/docker-entrypoint-initdb.d/init.sql

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
set -e
SRC_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$SRC_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=sql-dest
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
CI=${CI:-"false"}
DATABASE_TYPE="pgvector"
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
echo "Stopping SQL DB Docker container"
docker-compose -f scripts/sql-test-helpers/docker-compose-"$DATABASE_TYPE".yaml down --remove-orphans -v
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
if [ "$CI" == "true" ]; then
cleanup_dir "$DOWNLOAD_DIR"
fi
}
trap cleanup EXIT
# Create sql instance and create `elements` class
echo "Creating SQL DB instance"
# shellcheck source=/dev/null
scripts/sql-test-helpers/create-sql-instance.sh "$DATABASE_TYPE"
wait
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
--embedding-provider "langchain-huggingface" \
sql \
--db-type "postgresql" \
--username unstructured \
--password test \
--host localhost \
--port 5433 \
--database elements
"$SCRIPT_DIR"/python/test-ingest-sql-output.py "$DATABASE_TYPE" "5433"

View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
set -e
SRC_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$SRC_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=sql-dest
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
CI=${CI:-"false"}
DATABASE_TYPE="sqlite"
DB_PATH=$SCRIPT_DIR/elements.db
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
rm -rf "$DB_PATH"
if [ "$CI" == "true" ]; then
cleanup_dir "$DOWNLOAD_DIR"
fi
}
trap cleanup EXIT
# Create sql instance and create `elements` class
echo "Creating SQL DB instance"
# shellcheck source=/dev/null
scripts/sql-test-helpers/create-sql-instance.sh "$DATABASE_TYPE" "$DB_PATH"
wait
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
sql \
--db-type "$DATABASE_TYPE" \
--username unstructured \
--database "$DB_PATH"
"$SCRIPT_DIR"/python/test-ingest-sql-output.py "$DATABASE_TYPE" "$DB_PATH"

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
import sys
N_ELEMENTS = 5
def create_connection(db_type, database=None, port=None):
if db_type == "pgvector":
from psycopg2 import connect
return connect(
user="unstructured",
password="test",
dbname="elements",
host="localhost",
port=port,
)
elif db_type == "sqlite":
from sqlite3 import connect
return connect(database=database)
raise ValueError(f"Unsupported database {db_type} connection.")
if __name__ == "__main__":
database_name = sys.argv[1]
db_url = None
port = None
if database_name == "sqlite":
db_url = sys.argv[2]
else:
port = sys.argv[2]
print(f"Running SQL output test for: {database_name}")
conn = create_connection(database_name, db_url, port)
query = "select count(*) from elements;"
cursor = conn.cursor()
cursor.execute(query)
count = cursor.fetchone()[0]
if database_name == "pgvector":
"""Get embedding from database and then use it to
search for the closest vector (which should be itself)"""
cursor = conn.cursor()
cursor.execute("SELECT embeddings FROM elements order by text limit 1")
test_embedding = cursor.fetchone()[0]
similarity_query = (
f"SELECT text FROM elements ORDER BY embeddings <-> '{test_embedding}' LIMIT 1;"
)
cursor.execute(similarity_query)
res = cursor.fetchone()
assert res[0] == "Best Regards,"
print("Result of vector search against pgvector with embeddings successful")
try:
assert count == N_ELEMENTS
except AssertionError:
print(f"{database_name} dest check failed: got {count}, expected {N_ELEMENTS}")
raise
finally:
cursor.close()
conn.close()
print(f"SUCCESS: {database_name} dest check")

View File

@ -24,11 +24,13 @@ all_tests=(
'elasticsearch.sh'
'gcs.sh'
'mongodb.sh'
'pgvector.sh'
'pinecone.sh'
'qdrant.sh'
's3.sh'
'weaviate.sh'
'sharepoint-embed-cog-index.sh'
'sqlite.sh'
'weaviate.sh'
)
full_python_matrix_tests=(

View File

@ -6,11 +6,9 @@ from typing import Any, Dict
import pytest
from unstructured.documents.elements import DataSourceMetadata
from unstructured.ingest.connector.fsspec.sftp import SftpAccessConfig, SimpleSftpConfig
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
BaseSingleIngestDoc,
FsspecConfig,
PartitionConfig,
ProcessorConfig,
ReadConfig,
@ -257,91 +255,3 @@ def test_process_file_flatten_metadata(mocker, partition_test_results):
expected_keys = {"element_id", "text", "type", "filename", "file_directory", "filetype"}
for elem in isd_elems:
assert expected_keys == set(elem.keys())
def test_post_init_invalid_protocol():
"""Validate that an invalid protocol raises a ValueError"""
with pytest.raises(ValueError):
FsspecConfig(remote_url="ftp://example.com/path/to/file.txt")
def test_fsspec_path_extraction_dropbox_root():
"""Validate that the path extraction works for dropbox root"""
config = FsspecConfig(remote_url="dropbox:// /")
assert config.protocol == "dropbox"
assert config.path_without_protocol == " /"
assert config.dir_path == " "
assert config.file_path == ""
def test_fsspec_path_extraction_dropbox_subfolder():
"""Validate that the path extraction works for dropbox subfolder"""
config = FsspecConfig(remote_url="dropbox://path")
assert config.protocol == "dropbox"
assert config.path_without_protocol == "path"
assert config.dir_path == "path"
assert config.file_path == ""
def test_fsspec_path_extraction_s3_bucket_only():
"""Validate that the path extraction works for s3 bucket without filename"""
config = FsspecConfig(remote_url="s3://bucket-name")
assert config.protocol == "s3"
assert config.path_without_protocol == "bucket-name"
assert config.dir_path == "bucket-name"
assert config.file_path == ""
def test_fsspec_path_extraction_s3_valid_path():
"""Validate that the path extraction works for s3 bucket with filename"""
config = FsspecConfig(remote_url="s3://bucket-name/path/to/file.txt")
assert config.protocol == "s3"
assert config.path_without_protocol == "bucket-name/path/to/file.txt"
assert config.dir_path == "bucket-name"
assert config.file_path == "path/to/file.txt"
def test_fsspec_path_extraction_s3_invalid_path():
"""Validate that an invalid s3 path (that mimics triple slash for dropbox)
raises a ValueError"""
with pytest.raises(ValueError):
FsspecConfig(remote_url="s3:///bucket-name/path/to")
def test_sftp_path_extraction_post_init_with_extension():
"""Validate that the path extraction works for sftp with file extension"""
config = SimpleSftpConfig(
remote_url="sftp://example.com/path/to/file.txt",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == "file.txt"
assert config.dir_path == "path/to"
assert config.path_without_protocol == "path/to"
assert config.access_config.host == "example.com"
assert config.access_config.port == 22
def test_sftp_path_extraction_without_extension():
"""Validate that the path extraction works for sftp without extension"""
config = SimpleSftpConfig(
remote_url="sftp://example.com/path/to/directory",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == ""
assert config.dir_path == "path/to/directory"
assert config.path_without_protocol == "path/to/directory"
assert config.access_config.host == "example.com"
assert config.access_config.port == 22
def test_sftp_path_extraction_with_port():
"""Validate that the path extraction works for sftp with a non-default port"""
config = SimpleSftpConfig(
remote_url="sftp://example.com:47474/path/to/file.txt",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == "file.txt"
assert config.dir_path == "path/to"
assert config.path_without_protocol == "path/to"
assert config.access_config.host == "example.com"
assert config.access_config.port == 47474

View File

@ -1,12 +1,18 @@
from dataclasses import dataclass
from pathlib import Path
import pytest
from unstructured.ingest.connector.fsspec.dropbox import (
DropboxIngestDoc,
)
from unstructured.ingest.connector.fsspec.fsspec import (
FsspecIngestDoc,
)
from unstructured.ingest.connector.fsspec.sftp import SftpAccessConfig, SimpleSftpConfig
from unstructured.ingest.interfaces import (
FsspecConfig,
)
@dataclass
@ -127,3 +133,91 @@ def test_fsspec_folder_fails():
assert output_filename == Path("/fake_file2.txt.json")
assert download_filename == Path("/fake_file2.txt")
def test_post_init_invalid_protocol():
"""Validate that an invalid protocol raises a ValueError"""
with pytest.raises(ValueError):
FsspecConfig(remote_url="ftp://example.com/path/to/file.txt")
def test_fsspec_path_extraction_dropbox_root():
"""Validate that the path extraction works for dropbox root"""
config = FsspecConfig(remote_url="dropbox:// /")
assert config.protocol == "dropbox"
assert config.path_without_protocol == " /"
assert config.dir_path == " "
assert config.file_path == ""
def test_fsspec_path_extraction_dropbox_subfolder():
"""Validate that the path extraction works for dropbox subfolder"""
config = FsspecConfig(remote_url="dropbox://path")
assert config.protocol == "dropbox"
assert config.path_without_protocol == "path"
assert config.dir_path == "path"
assert config.file_path == ""
def test_fsspec_path_extraction_s3_bucket_only():
"""Validate that the path extraction works for s3 bucket without filename"""
config = FsspecConfig(remote_url="s3://bucket-name")
assert config.protocol == "s3"
assert config.path_without_protocol == "bucket-name"
assert config.dir_path == "bucket-name"
assert config.file_path == ""
def test_fsspec_path_extraction_s3_valid_path():
"""Validate that the path extraction works for s3 bucket with filename"""
config = FsspecConfig(remote_url="s3://bucket-name/path/to/file.txt")
assert config.protocol == "s3"
assert config.path_without_protocol == "bucket-name/path/to/file.txt"
assert config.dir_path == "bucket-name"
assert config.file_path == "path/to/file.txt"
def test_fsspec_path_extraction_s3_invalid_path():
"""Validate that an invalid s3 path (that mimics triple slash for dropbox)
raises a ValueError"""
with pytest.raises(ValueError):
FsspecConfig(remote_url="s3:///bucket-name/path/to")
def test_sftp_path_extraction_post_init_with_extension():
"""Validate that the path extraction works for sftp with file extension"""
config = SimpleSftpConfig(
remote_url="sftp://example.com/path/to/file.txt",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == "file.txt"
assert config.dir_path == "path/to"
assert config.path_without_protocol == "path/to"
assert config.access_config.host == "example.com"
assert config.access_config.port == 22
def test_sftp_path_extraction_without_extension():
"""Validate that the path extraction works for sftp without extension"""
config = SimpleSftpConfig(
remote_url="sftp://example.com/path/to/directory",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == ""
assert config.dir_path == "path/to/directory"
assert config.path_without_protocol == "path/to/directory"
assert config.access_config.host == "example.com"
assert config.access_config.port == 22
def test_sftp_path_extraction_with_port():
"""Validate that the path extraction works for sftp with a non-default port"""
config = SimpleSftpConfig(
remote_url="sftp://example.com:47474/path/to/file.txt",
access_config=SftpAccessConfig(username="username", password="password", host="", port=22),
)
assert config.file_path == "file.txt"
assert config.dir_path == "path/to"
assert config.path_without_protocol == "path/to"
assert config.access_config.host == "example.com"
assert config.access_config.port == 47474

View File

@ -0,0 +1,125 @@
import datetime
from unittest.mock import Mock, patch
from unstructured.ingest.connector.sql import SqlDestinationConnector
TEST_DATA_1 = {
"element_id": "80803034fe04181c163306740700cc54",
"metadata": {
"coordinates": {
"layout_height": 792,
"layout_width": 612,
"points": [
[72.0, 72.69200000000001],
[72.0, 83.69200000000001],
[135.8, 83.69200000000001],
[135.8, 72.69200000000001],
],
"system": "PixelSpace",
},
"data_source": {
"date_created": "2023-10-25 10:05:44.976775",
"date_modified": "2023-10-25 10:05:44.976775",
"date_processed": "2023-12-14T17:06:33.074057",
"permissions_data": [{"mode": 33188}],
"url": "example-docs/fake-memo.pdf",
},
"file_directory": "example-docs",
"filename": "fake-memo.pdf",
"filetype": "application/pdf",
"languages": ["eng"],
"last_modified": "2023-10-25T10:05:44",
"page_number": 1,
},
"text": "May 5, 2023",
"type": "UncategorizedText",
"embeddings": [
-0.05623878538608551,
0.008579030632972717,
0.03698136284947395,
-0.01745658740401268,
-0.030465232208371162,
0.00996527448296547,
],
}
TEST_DATA_2 = {
"metadata": {
"coordinates": {"points": [1, 2, 3]},
"links": {"link1": "https://example.com", "link2": "https://example.org"},
"data_source": {
"date_created": "2021-01-01T00:00:00",
"date_modified": "2021-01-02T00:00:00",
"date_processed": "2022-12-13T15:44:08",
"version": 1.1,
},
"last_modified": "2021-01-03T00:00:00",
"page_number": 10,
"regex_metadata": {"pattern": "abc"},
},
"embeddings": [0.1, 0.2, 0.3],
}
def test_conform_dict_1():
"""Validate that the conform_dict method returns the expected output for a real example"""
# Create a mock instance of the connector class
connector = SqlDestinationConnector(write_config=Mock(), connector_config=Mock())
# Mock the uuid.uuid4 function to return a fixed value
with patch("uuid.uuid4", return_value="mocked_uuid"):
# Call the conform_dict method
data_out = connector.conform_dict(TEST_DATA_1)
# Assert that the result matches the expected output
assert data_out == {
"element_id": "80803034fe04181c163306740700cc54",
"text": "May 5, 2023",
"type": "UncategorizedText",
"id": "mocked_uuid",
"file_directory": "example-docs",
"filename": "fake-memo.pdf",
"filetype": "application/pdf",
"languages": ["eng"],
"last_modified": datetime.datetime(2023, 10, 25, 10, 5, 44),
"page_number": "1",
"date_created": datetime.datetime(2023, 10, 25, 10, 5, 44, 976775),
"date_modified": datetime.datetime(2023, 10, 25, 10, 5, 44, 976775),
"date_processed": datetime.datetime(2023, 12, 14, 17, 6, 33, 74057),
"permissions_data": '[{"mode": 33188}]',
"url": "example-docs/fake-memo.pdf",
"layout_height": 792,
"layout_width": 612,
"points": "[[72.0, 72.69200000000001], [72.0, 83.69200000000001],"
" [135.8, 83.69200000000001], [135.8, 72.69200000000001]]",
"system": "PixelSpace",
"embeddings": "[-0.05623878538608551, 0.008579030632972717, "
"0.03698136284947395, -0.01745658740401268, "
"-0.030465232208371162, 0.00996527448296547]",
}
def test_conform_dict_2():
"""Validate that the conform_dict method returns the expected output for a simplified example"""
# Create a mock instance of the connector class
connector = SqlDestinationConnector(write_config=Mock(), connector_config=Mock())
# Mock the uuid.uuid4 function to return a fixed value
with patch("uuid.uuid4", return_value="mocked_uuid"):
# Call the conform_dict method
data_out = connector.conform_dict(TEST_DATA_2)
# Assert that the result matches the expected output
assert data_out == {
"embeddings": "[0.1, 0.2, 0.3]",
"id": "mocked_uuid",
"links": '{"link1": "https://example.com", "link2": "https://example.org"}',
"last_modified": datetime.datetime(2021, 1, 3, 0, 0),
"page_number": "10",
"regex_metadata": '{"pattern": "abc"}',
"date_created": datetime.datetime(2021, 1, 1, 0, 0),
"date_modified": datetime.datetime(2021, 1, 2, 0, 0),
"date_processed": datetime.datetime(2022, 12, 13, 15, 44, 8),
"version": "1.1",
"points": "[1, 2, 3]",
}

View File

@ -1 +1 @@
__version__ = "0.11.9-dev0" # pragma: no cover
__version__ = "0.11.9-dev1" # pragma: no cover

View File

@ -44,6 +44,7 @@ from .reddit import get_base_src_cmd as reddit_base_src_cmd
from .salesforce import get_base_src_cmd as salesforce_base_src_cmd
from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd
from .slack import get_base_src_cmd as slack_base_src_cmd
from .sql import get_base_dest_cmd as sql_base_dest_cmd
from .weaviate import get_base_dest_cmd as weaviate_dest_cmd
from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd
@ -101,6 +102,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
s3_base_dest_cmd,
azure_cognitive_search_base_dest_cmd,
delta_table_dest_cmd,
sql_base_dest_cmd,
weaviate_dest_cmd,
mongo_base_dest_cmd,
pinecone_base_dest_cmd,

View File

@ -0,0 +1,66 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import CliConfig
from unstructured.ingest.connector.sql import SimpleSqlConfig
from unstructured.ingest.interfaces import WriteConfig
SQL_DRIVERS = {"postgresql", "sqlite"}
@dataclass
class SqlCliConfig(SimpleSqlConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--db-type"],
required=True,
type=click.Choice(SQL_DRIVERS),
help="Type of the database backend",
),
click.Option(
["--username"],
default=None,
type=str,
help="DB username",
),
click.Option(
["--password"],
default=None,
type=str,
help="DB password",
),
click.Option(
["--host"],
default=None,
type=str,
help="DB host",
),
click.Option(
["--port"],
default=None,
type=int,
help="DB host connection port",
),
click.Option(
["--database"],
default=None,
type=str,
help="Database name. For sqlite databases, this is the path to the .db file.",
),
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name="sql",
cli_config=SqlCliConfig,
write_config=WriteConfig,
)
return cmd_cls

View File

@ -0,0 +1,201 @@
import json
import typing as t
import uuid
from dataclasses import dataclass, field
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
ELEMENTS_TABLE_NAME = "elements"
@dataclass
class SqlAccessConfig(AccessConfig):
username: t.Optional[str]
password: t.Optional[str] = enhanced_field(sensitive=True)
@dataclass
class SimpleSqlConfig(BaseConnectorConfig):
db_type: t.Optional[str]
host: t.Optional[str]
database: t.Optional[str]
port: t.Optional[int]
access_config: SqlAccessConfig
def __post_init__(self):
if (self.db_type == "sqlite") and (self.database is None):
raise ValueError(
"A sqlite connection requires a path to a *.db file "
"through the `database` argument"
)
@property
def connection(self):
if self.db_type == "postgresql":
return self._make_psycopg_connection
elif self.db_type == "sqlite":
return self._make_sqlite_connection
raise ValueError(f"Unsupported database {self.db_type} connection.")
def _make_sqlite_connection(self):
from sqlite3 import connect
return connect(database=self.database)
@requires_dependencies(["psycopg2"], extras="postgresql")
def _make_psycopg_connection(self):
from psycopg2 import connect
return connect(
user=self.access_config.username,
password=self.access_config.password,
dbname=self.database,
host=self.host,
port=self.port,
)
@dataclass
class SqlDestinationConnector(BaseDestinationConnector):
connector_config: SimpleSqlConfig
_client: t.Optional[t.Any] = field(init=False, default=None)
@property
def client(self):
if self._client is None:
self._client = self.connector_config.connection()
return self._client
@DestinationConnectionError.wrap
def initialize(self):
_ = self.client
def check_connection(self):
cursor = self.client.cursor()
cursor.execute("SELECT 1;")
cursor.close()
def conform_dict(self, data: dict) -> tuple:
"""
Updates the element dictionary to conform to the sql schema
"""
from datetime import datetime
data["id"] = str(uuid.uuid4())
# Dict as string formatting
if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"):
# Explicit casting otherwise fails schema type checking
data["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator))
# Array of items as string formatting
if (embeddings := data.get("embeddings")) and (
self.connector_config.db_type != "postgresql"
):
data["embeddings"] = str(json.dumps(embeddings))
if points := data.get("metadata", {}).get("coordinates", {}).get("points"):
data["metadata"]["coordinates"]["points"] = str(json.dumps(points))
if links := data.get("metadata", {}).get("links", {}):
data["metadata"]["links"] = str(json.dumps(links))
if permissions_data := (
data.get("metadata", {}).get("data_source", {}).get("permissions_data")
):
data["metadata"]["data_source"]["permissions_data"] = json.dumps(permissions_data)
if link_texts := data.get("metadata", {}).get("link_texts", {}):
data["metadata"]["link_texts"] = str(json.dumps(link_texts))
if sent_from := data.get("metadata", {}).get("sent_from", {}):
data["metadata"]["sent_from"] = str(json.dumps(sent_from))
if sent_to := data.get("metadata", {}).get("sent_to", {}):
data["metadata"]["sent_to"] = str(json.dumps(sent_to))
# Datetime formatting
if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"):
data["metadata"]["data_source"]["date_created"] = datetime.fromisoformat(date_created)
if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"):
data["metadata"]["data_source"]["date_modified"] = datetime.fromisoformat(date_modified)
if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"):
data["metadata"]["data_source"]["date_processed"] = datetime.fromisoformat(
date_processed
)
if last_modified := data.get("metadata", {}).get("last_modified", {}):
data["metadata"]["last_modified"] = datetime.fromisoformat(last_modified)
# String casting
if version := data.get("metadata", {}).get("data_source", {}).get("version"):
data["metadata"]["data_source"]["version"] = str(version)
if page_number := data.get("metadata", {}).get("page_number"):
data["metadata"]["page_number"] = str(page_number)
if regex_metadata := data.get("metadata", {}).get("regex_metadata"):
data["metadata"]["regex_metadata"] = str(json.dumps(regex_metadata))
if data.get("metadata", {}).get("data_source", None):
data.update(data.get("metadata", {}).pop("data_source", None))
if data.get("metadata", {}).get("coordinates", None):
data.update(data.get("metadata", {}).pop("coordinates", None))
if data.get("metadata", {}):
data.update(data.pop("metadata", None))
return data
@DestinationConnectionError.wrap
def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"writing {len(json_list)} objects to database {self.connector_config.database} "
f"at {self.connector_config.host}"
)
with self.client as conn:
cursor = conn.cursor()
# Since we have no guarantee that each element will have the same keys
# we insert each element individually
for e in json_list:
elem = self.conform_dict(e)
query = f"INSERT INTO {ELEMENTS_TABLE_NAME} ({','.join(elem.keys())}) \
VALUES({','.join(['?' if self.connector_config.db_type=='sqlite' else '%s' for x in elem])})" # noqa E501
values = []
for v in elem.values():
if self.connector_config.db_type == "sqlite" and isinstance(v, list):
values.append(json.dumps(v))
else:
values.append(v)
cursor.execute(query, values)
conn.commit()
cursor.close()
# Leaving contexts doesn't close the connection, so doing it here
conn.close()
def write(self, docs: t.List[BaseIngestDoc]) -> None:
json_list: t.List[t.Dict[str, t.Any]] = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
json_content = json.load(json_file)
logger.info(
f"appending {len(json_content)} json elements from content in {local_path}",
)
json_list.extend(json_content)
self.write_dict(json_list=json_list)

View File

@ -52,8 +52,8 @@ class BaseConfig(EnhancedDataClassJsonMixin, ABC):
@dataclass
class AccessConfig(BaseConfig):
# Meant to designate holding any sensitive information associated with other configs
pass
"""Meant to designate holding any sensitive information associated with other configs
and also for access specific configs."""
@dataclass

View File

@ -13,6 +13,7 @@ from .fsspec.s3 import S3Writer
from .mongodb import MongodbWriter
from .pinecone import PineconeWriter
from .qdrant import QdrantWriter
from .sql import SqlWriter
from .weaviate import WeaviateWriter
writer_map: t.Dict[str, t.Type[Writer]] = {
@ -28,6 +29,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = {
"pinecone": PineconeWriter,
"qdrant": QdrantWriter,
"s3": S3Writer,
"sql": SqlWriter,
"weaviate": WeaviateWriter,
}

View File

@ -0,0 +1,22 @@
import typing as t
from dataclasses import dataclass
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.ingest.runner.writers.base_writer import Writer
if t.TYPE_CHECKING:
from unstructured.ingest.connector.sql import SimpleSqlConfig
from unstructured.ingest.interfaces import WriteConfig
@dataclass
class SqlWriter(Writer):
write_config: "WriteConfig"
connector_config: "SimpleSqlConfig"
def get_connector_cls(self) -> t.Type[BaseDestinationConnector]:
from unstructured.ingest.connector.sql import (
SqlDestinationConnector,
)
return SqlDestinationConnector