feat: Weaviate destination connector (#1963)

Closes #1781.
- Adds a Weaviate destination connector
- The connector receives a host for the weaviate instance and a weaviate
class name.
- Defines a weaviate schema for json elements.
- Defines the pre-processing to conform unstructured's schema to the
proposed weaviate schema.
This commit is contained in:
rvztz 2023-12-01 16:27:41 -06:00 committed by GitHub
parent 69d0ee1aea
commit ce905dd098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1409 additions and 5 deletions

View File

@ -1,4 +1,4 @@
## 0.11.4-dev1
## 0.11.4-dev2
### Enhancements
@ -6,6 +6,9 @@
### Features
* **Add Weaviate destination connector** Weaviate connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to a Weaviate object collection.
### Fixes
## 0.11.3

View File

@ -191,6 +191,10 @@ install-ingest-airtable:
install-ingest-sharepoint:
python3 -m pip install -r requirements/ingest/sharepoint.txt
.PHONY: install-ingest-weaviate
install-ingest-weaviate:
python3 -m pip install -r requirements/ingest/weaviate.txt
.PHONY: install-ingest-local
install-ingest-local:
echo "no unique dependencies for local connector"

View File

@ -11,5 +11,6 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/azure_cognitive_search
destination_connectors/delta_table
destination_connectors/mongodb
destination_connectors/weaviate
destination_connectors/pinecone
destination_connectors/s3

View File

@ -0,0 +1,67 @@
Weaviate
===========
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Weaviate collection.
First you'll need to install the weaviate dependencies as shown here.
.. code:: shell
pip install "unstructured[weaviate]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream weaviate connector. This will push elements into a collection schema of your choice into a weaviate instance
running locally.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
local \
--input-path example-docs/fake-memo.pdf \
--anonymous \
--output-dir local-output-to-weaviate \
--num-processes 2 \
--verbose \
--strategy fast \
weaviate \
--host-url http://localhost:8080 \
--class-name elements \
.. tab:: Python
.. code:: python
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-weaviate",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
writer_type="weaviate",
writer_kwargs={
"host_url": os.getenv("WEAVIATE_HOST_URL"),
"class_name": os.getenv("WEAVIATE_CLASS_NAME")
}
)
runner.run(
input_path="example-docs/fake-memo.pdf",
)
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> weaviate --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,423 @@
{
"class": "Elements",
"invertedIndexConfig": {
"bm25": {
"b": 0.75,
"k1": 1.2
},
"cleanupIntervalSeconds": 60,
"stopwords": {
"additions": null,
"preset": "en",
"removals": null
}
},
"multiTenancyConfig": {
"enabled": false
},
"properties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "element_id",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "text",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "type",
"tokenization": "word"
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "metadata",
"nestedProperties": [
{
"dataType": [
"int"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "category_depth"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "parent_id",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "attached_to_filename",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "filetype",
"tokenization": "word"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "last_modified"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "file_directory",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "filename",
"tokenization": "word"
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "data_source",
"nestedProperties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "url",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "version",
"tokenization": "word"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_created"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_modified"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_processed"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "record_locator",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "permissions_data",
"tokenization": "word"
}
]
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "coordinates",
"nestedProperties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "system",
"tokenization": "word"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "layout_width"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "layout_height"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "points",
"tokenization": "word"
}
]
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "languages",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "page_number"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "page_name",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "url",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "links",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "link_urls",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "link_texts",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "sent_from",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "sent_to",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "subject",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "section",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "header_footer_type",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "emphasized_text_contents",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "emphasized_text_tags",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "text_as_html",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "regex_metadata",
"tokenization": "word"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "detection_class_prob"
}
]
}
],
"replicationConfig": {
"factor": 1
},
"shardingConfig": {
"virtualPerPhysical": 128,
"desiredCount": 1,
"actualCount": 1,
"desiredVirtualCount": 128,
"actualVirtualCount": 128,
"key": "_id",
"strategy": "hash",
"function": "murmur3"
},
"vectorIndexConfig": {
"skip": false,
"cleanupIntervalSeconds": 300,
"maxConnections": 64,
"efConstruction": 128,
"ef": -1,
"dynamicEfMin": 100,
"dynamicEfMax": 500,
"dynamicEfFactor": 8,
"vectorCacheMaxObjects": 1000000000000,
"flatSearchCutoff": 40000,
"distance": "cosine",
"pq": {
"enabled": false,
"bitCompression": false,
"segments": 0,
"centroids": 256,
"trainingLimit": 100000,
"encoder": {
"type": "kmeans",
"distribution": "log-normal"
}
}
},
"vectorIndexType": "hnsw",
"vectorizer": "none"
}

View File

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Uploads the structured output of the files within the given S3 path to a Weaviate index.
# Structured outputs are stored in s3-small-batch-output-to-weaviate/
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes 2 \
--output-dir weaviate-output \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir weaviate-work-dir \
--chunk-elements \
--chunk-new-after-n-chars 2500\
--chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
weaviate \
--host-url http://localhost:8080 \
--class-name elements \
--batch-size 100

View File

@ -35,7 +35,7 @@ pydantic<2
safetensors<=0.3.2
# use the known compatible version of weaviate and unstructured.pytesseract
unstructured.pytesseract>=0.3.12
weaviate-client==3.23.2
weaviate-client>3.25.0
# Note(yuming) - pining to avoid conflict with paddle install
matplotlib==3.7.2
# NOTE(crag) - pin to available pandas for python 3.8 (at least in CI)

View File

@ -0,0 +1,3 @@
-c ../constraints.in
-c ../base.txt
weaviate-client

View File

@ -0,0 +1,45 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --constraint=requirements/constraints.in requirements/ingest/weaviate.in
#
authlib==1.2.1
# via weaviate-client
certifi==2023.11.17
# via
# -c requirements/constraints.in
# -c requirements/ingest/../base.txt
# -c requirements/ingest/../constraints.in
# requests
cffi==1.16.0
# via cryptography
charset-normalizer==3.3.2
# via
# -c requirements/ingest/../base.txt
# requests
cryptography==41.0.5
# via authlib
idna==3.4
# via
# -c requirements/ingest/../base.txt
# requests
pycparser==2.21
# via cffi
requests==2.31.0
# via
# -c requirements/ingest/../base.txt
# weaviate-client
urllib3==1.26.18
# via
# -c requirements/constraints.in
# -c requirements/ingest/../base.txt
# -c requirements/ingest/../constraints.in
# requests
validators==0.22.0
# via weaviate-client
weaviate-client==3.25.3
# via
# -c requirements/constraints.in
# -c requirements/ingest/../constraints.in
# -r requirements/ingest/weaviate.in

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$(dirname "$(realpath "$0")")
# Create the Weaviate instance
docker compose version
docker compose -f "$SCRIPT_DIR"/docker-compose.yml up --wait
docker compose -f "$SCRIPT_DIR"/docker-compose.yml ps
echo "Instance is live."
"$SCRIPT_DIR"/create_schema.py

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
import json
import os
import weaviate
weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080")
class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements")
new_class = None
with open("./scripts/weaviate-test-helpers/elements.json") as f:
new_class = json.load(f)
client = weaviate.Client(
url=weaviate_host_url,
)
if client.schema.exists(class_name):
client.schema.delete_class(class_name)
client.schema.create_class(new_class)

View File

@ -0,0 +1,21 @@
version: '3.4'
services:
weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: semitechnologies/weaviate:1.22.1
ports:
- 8080:8080
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'none'
ENABLE_MODULES: ''
CLUSTER_HOSTNAME: 'node1'

View File

@ -0,0 +1,423 @@
{
"class": "Elements",
"invertedIndexConfig": {
"bm25": {
"b": 0.75,
"k1": 1.2
},
"cleanupIntervalSeconds": 60,
"stopwords": {
"additions": null,
"preset": "en",
"removals": null
}
},
"multiTenancyConfig": {
"enabled": false
},
"properties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "element_id",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "text",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "type",
"tokenization": "word"
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "metadata",
"nestedProperties": [
{
"dataType": [
"int"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "category_depth"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "parent_id",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "attached_to_filename",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "filetype",
"tokenization": "word"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "last_modified"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "file_directory",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "filename",
"tokenization": "word"
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "data_source",
"nestedProperties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "url",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "version",
"tokenization": "word"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_created"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_modified"
},
{
"dataType": [
"date"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "date_processed"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "record_locator",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "permissions_data",
"tokenization": "word"
}
]
},
{
"dataType": [
"object"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "coordinates",
"nestedProperties": [
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "system",
"tokenization": "word"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "layout_width"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "layout_height"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "points",
"tokenization": "word"
}
]
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "languages",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "page_number"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "page_name",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "url",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "links",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "link_urls",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "link_texts",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "sent_from",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "sent_to",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "subject",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "section",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "header_footer_type",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "emphasized_text_contents",
"tokenization": "word"
},
{
"dataType": [
"text[]"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "emphasized_text_tags",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "text_as_html",
"tokenization": "word"
},
{
"dataType": [
"text"
],
"indexFilterable": true,
"indexSearchable": true,
"name": "regex_metadata",
"tokenization": "word"
},
{
"dataType": [
"number"
],
"indexFilterable": true,
"indexSearchable": false,
"name": "detection_class_prob"
}
]
}
],
"replicationConfig": {
"factor": 1
},
"shardingConfig": {
"virtualPerPhysical": 128,
"desiredCount": 1,
"actualCount": 1,
"desiredVirtualCount": 128,
"actualVirtualCount": 128,
"key": "_id",
"strategy": "hash",
"function": "murmur3"
},
"vectorIndexConfig": {
"skip": false,
"cleanupIntervalSeconds": 300,
"maxConnections": 64,
"efConstruction": 128,
"ef": -1,
"dynamicEfMin": 100,
"dynamicEfMax": 500,
"dynamicEfFactor": 8,
"vectorCacheMaxObjects": 1000000000000,
"flatSearchCutoff": 40000,
"distance": "cosine",
"pq": {
"enabled": false,
"bitCompression": false,
"segments": 0,
"centroids": 256,
"trainingLimit": 100000,
"encoder": {
"type": "kmeans",
"distribution": "log-normal"
}
}
},
"vectorIndexType": "hnsw",
"vectorizer": "none"
}

View File

@ -155,6 +155,7 @@ setup(
"salesforce": load_requirements("requirements/ingest/salesforce.in"),
"slack": load_requirements("requirements/ingest/slack.in"),
"wikipedia": load_requirements("requirements/ingest/wikipedia.in"),
"weaviate": load_requirements("requirements/ingest/weaviate.in"),
# Legacy extra requirements
"huggingface": load_requirements("requirements/huggingface.in"),
"local-inference": all_doc_reqs,

View File

@ -7,7 +7,8 @@ import pytest
# NOTE(robinson) - allows tests that do not require the weaviate client to
# run for the docker container
with contextlib.suppress(ModuleNotFoundError):
from weaviate.schema.validate_schema import validate_schema
from weaviate import Client
from weaviate.embedded import EmbeddedOptions
from unstructured.partition.json import partition_json
from unstructured.staging.weaviate import (
@ -56,4 +57,5 @@ def test_stage_for_weaviate(filename="example-docs/layout-parser-paper-fast.pdf"
def test_weaviate_schema_is_valid():
unstructured_class = create_unstructured_weaviate_class()
schema = {"classes": [unstructured_class]}
validate_schema(schema)
client = Client(embedded_options=EmbeddedOptions())
client.schema.create(schema)

View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
set -e
DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=weaviate-dest
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
CI=${CI:-"false"}
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
# Index cleanup
echo "Stopping Weaviate Docker container"
docker-compose -f scripts/weaviate-test-helpers/docker-compose.yml down --remove-orphans -v
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
}
trap cleanup EXIT
# Create weaviate instance and create `elements` class
echo "Creating weaviate instance"
# shellcheck source=/dev/null
scripts/weaviate-test-helpers/create-weaviate-instance.sh
wait
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
--embedding-provider "langchain-huggingface" \
weaviate \
--host-url http://localhost:8080 \
--class-name elements \
"$SCRIPT_DIR"/python/test-ingest-weaviate-output.py

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
import os
import sys
import weaviate
weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080")
class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements")
N_ELEMENTS = 5
if __name__ == "__main__":
print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}")
client = weaviate.Client(
url=weaviate_host_url,
)
response = client.query.aggregate(class_name).with_meta_count().do()
count = response["data"]["Aggregate"][class_name][0]["meta"]["count"]
try:
assert count == N_ELEMENTS
except AssertionError:
sys.exit(f"FAIL: weaviate dest check failed: got {count}, expected {N_ELEMENTS}")
print("SUCCESS: weaviate dest check")

View File

@ -24,6 +24,7 @@ all_tests=(
'mongodb.sh'
'pinecone.sh'
's3.sh'
'weaviate.sh'
'sharepoint-embed-cog-index.sh'
)

View File

@ -1 +1 @@
__version__ = "0.11.4-dev1" # pragma: no cover
__version__ = "0.11.4-dev2" # pragma: no cover

View File

@ -40,6 +40,7 @@ from .s3 import get_base_src_cmd as s3_base_src_cmd
from .salesforce import get_base_src_cmd as salesforce_base_src_cmd
from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd
from .slack import get_base_src_cmd as slack_base_src_cmd
from .weaviate import get_base_dest_cmd as weaviate_dest_cmd
from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd
if t.TYPE_CHECKING:
@ -93,6 +94,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
s3_base_dest_cmd,
azure_cognitive_search_base_dest_cmd,
delta_table_dest_cmd,
weaviate_dest_cmd,
mongo_base_dest_cmd,
pinecone_base_dest_cmd,
]

View File

@ -0,0 +1,68 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import CliConfig, Dict
from unstructured.ingest.connector.weaviate import SimpleWeaviateConfig, WeaviateWriteConfig
CMD_NAME = "weaviate"
@dataclass
class WeaviateCliConfig(SimpleWeaviateConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--host-url"],
required=True,
help="Weaviate instance url",
),
click.Option(
["--class-name"],
default=None,
type=str,
help="Name of the class to push the records into, e.g: Pdf-elements",
),
click.Option(
["--auth-keys"],
required=False,
type=Dict(),
help=(
"String representing a JSON-like dict with key,value containing "
"the required parameters to create an authentication object. "
"The connector resolves th authentication object from the parameters. "
"See https://weaviate.io/developers/weaviate/client-libraries/python_v3"
"#api-key-authentication "
"for more information."
),
),
]
return options
@dataclass
class WeaviateCliWriteConfig(WeaviateWriteConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--batch-size"],
default=100,
type=int,
help="Number of records per batch",
)
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name=CMD_NAME,
cli_config=WeaviateCliConfig,
additional_cli_options=[WeaviateCliWriteConfig],
)
return cmd_cls

View File

@ -0,0 +1,175 @@
import json
import typing as t
from dataclasses import dataclass, field
from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
if t.TYPE_CHECKING:
from weaviate import Client
@dataclass
class SimpleWeaviateConfig(BaseConnectorConfig):
host_url: str
class_name: str
auth_keys: t.Optional[t.Dict[str, str]] = None
@dataclass
class WeaviateWriteConfig(WriteConfig):
batch_size: int = 100
@dataclass
class WeaviateDestinationConnector(BaseDestinationConnector):
write_config: WeaviateWriteConfig
connector_config: SimpleWeaviateConfig
_client: t.Optional["Client"] = field(init=False, default=None)
@property
@requires_dependencies(["weaviate"], extras="weaviate")
def client(self) -> "Client":
if self._client is None:
from weaviate import Client
auth = self._resolve_auth_method()
self._client = Client(url=self.connector_config.host_url, auth_client_secret=auth)
return self._client
@requires_dependencies(["weaviate"], extras="weaviate")
@DestinationConnectionError.wrap
def initialize(self):
_ = self.client
@requires_dependencies(["weaviate"], extras="weaviate")
def check_connection(self):
try:
_ = self.client
except Exception as e:
logger.error(f"Failed to validate connection {e}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {e}")
def _resolve_auth_method(self):
if self.connector_config.auth_keys is None:
return None
if access_token := self.connector_config.auth_keys.get("access_token"):
from weaviate.auth import AuthBearerToken
return AuthBearerToken(
access_token=access_token,
refresh_token=self.connector_config.auth_keys.get("refresh_token"),
)
elif api_key := self.connector_config.auth_keys.get("api_key"):
from weaviate.auth import AuthApiKey
return AuthApiKey(api_key=api_key)
elif client_secret := self.connector_config.auth_keys.get("client_secret"):
from weaviate.auth import AuthClientCredentials
return AuthClientCredentials(
client_secret=client_secret, scope=self.connector_config.auth_keys.get("scope")
)
elif (username := self.connector_config.auth_keys.get("username")) and (
pwd := self.connector_config.auth_keys.get("password")
):
from weaviate.auth import AuthClientPassword
return AuthClientPassword(
username=username, password=pwd, scope=self.connector_config.auth_keys.get("scope")
)
return None
def conform_dict(self, data: dict) -> None:
"""
Updates the element dictionary to conform to the Weaviate schema
"""
from dateutil import parser
# Dict as string formatting
if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"):
# Explicit casting otherwise fails schema type checking
data["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator))
# Array of items as string formatting
if points := data.get("metadata", {}).get("coordinates", {}).get("points"):
data["metadata"]["coordinates"]["points"] = str(json.dumps(points))
if links := data.get("metadata", {}).get("links", {}):
data["metadata"]["links"] = str(json.dumps(links))
if permissions_data := (
data.get("metadata", {}).get("data_source", {}).get("permissions_data")
):
data["metadata"]["data_source"]["permissions_data"] = json.dumps(permissions_data)
# Datetime formatting
if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"):
data["metadata"]["data_source"]["date_created"] = parser.parse(date_created).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"):
data["metadata"]["data_source"]["date_modified"] = parser.parse(date_modified).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"):
data["metadata"]["data_source"]["date_processed"] = parser.parse(
date_processed
).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if last_modified := data.get("metadata", {}).get("last_modified", {}):
data["metadata"]["last_modified"] = parser.parse(last_modified).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
# String casting
if version := data.get("metadata", {}).get("data_source", {}).get("version"):
data["metadata"]["data_source"]["version"] = str(version)
if page_number := data.get("metadata", {}).get("page_number"):
data["metadata"]["page_number"] = str(page_number)
if regex_metadata := data.get("metadata", {}).get("regex_metadata"):
data["metadata"]["regex_metadata"] = str(json.dumps(regex_metadata))
def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"writing {len(json_list)} objects to destination "
f"class {self.connector_config.class_name} "
f"at {self.connector_config.host_url}",
)
self.client.batch.configure(batch_size=self.write_config.batch_size)
with self.client.batch as b:
for e in json_list:
self.conform_dict(e)
vector = e.pop("embeddings", None)
b.add_data_object(
e,
self.connector_config.class_name,
vector=vector,
)
@requires_dependencies(["weaviate"], extras="weaviate")
def write(self, docs: t.List[BaseIngestDoc]) -> None:
json_list: t.List[t.Dict[str, t.Any]] = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
json_content = json.load(json_file)
logger.info(
f"appending {len(json_content)} json elements from content in {local_path}",
)
json_list.extend(json_content)
self.write_dict(json_list=json_list)

View File

@ -9,6 +9,7 @@ from .gcs import gcs_writer
from .mongodb import mongodb_writer
from .pinecone import pinecone_writer
from .s3 import s3_writer
from .weaviate import weaviate_writer
writer_map: t.Dict[str, t.Callable] = {
"azure": azure_writer,
@ -19,6 +20,7 @@ writer_map: t.Dict[str, t.Callable] = {
"gcs": gcs_writer,
"mongodb": mongodb_writer,
"s3": s3_writer,
"weaviate": weaviate_writer,
"pinecone": pinecone_writer,
}

View File

@ -0,0 +1,27 @@
import typing as t
from unstructured.utils import requires_dependencies
@requires_dependencies(["weaviate"], extras="weaviate")
def weaviate_writer(
host_url: str,
class_name: str,
batch_size: int = 100,
auth_keys: t.Optional[t.List[str]] = None,
**kwargs,
):
from unstructured.ingest.connector.weaviate import (
SimpleWeaviateConfig,
WeaviateDestinationConnector,
WeaviateWriteConfig,
)
return WeaviateDestinationConnector(
write_config=WeaviateWriteConfig(batch_size=batch_size),
connector_config=SimpleWeaviateConfig(
host_url=host_url,
class_name=class_name,
auth_keys=auth_keys,
),
)