feat: add Vectara destination connector (#2357)

Thanks to Ofer at Vectara, we now have a Vectara destination connector.

- There are no dependencies since it is all REST calls to API
-

---------

Co-authored-by: potter-potter <david.potter@gmail.com>
This commit is contained in:
David Potter 2024-02-01 06:38:34 -08:00 committed by GitHub
parent 94001a208d
commit c100ce28a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 570 additions and 6 deletions

View File

@ -379,6 +379,9 @@ jobs:
MONGODB_DATABASE_NAME: ${{ secrets.MONGODB_DATABASE_NAME }}
AZURE_DEST_CONNECTION_STR: ${{ secrets.AZURE_DEST_CONNECTION_STR }}
PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}}
VECTARA_OAUTH_CLIENT_ID: ${{secrets.VECTARA_OAUTH_CLIENT_ID}}
VECTARA_OAUTH_SECRET: ${{secrets.VECTARA_OAUTH_SECRET}}
VECTARA_CUSTOMER_ID: ${{secrets.VECTARA_CUSTOMER_ID}}
TABLE_OCR: "tesseract"
OCR_AGENT: "unstructured.partition.utils.ocr_models.tesseract_ocr.OCRAgentTesseract"
CI: "true"

View File

@ -1,4 +1,4 @@
## 0.12.4-dev3
## 0.12.4-dev4
### Enhancements
@ -8,6 +8,7 @@
* **Add .heic file partitioning** .heic image files were previously unsupported and are now supported though partition_image()
* **Add the ability to specify an alternate OCR** implementation by implementing an `OCRAgent` interface and specify it using `OCR_AGENT` environment variable.
* **Add Vectara destination connector** Adds support for writing partitioned documents into a Vectara index.
### Fixes

View File

@ -227,6 +227,10 @@ install-ingest-sftp:
install-ingest-pinecone:
python3 -m pip install -r requirements/ingest/pinecone.txt
.PHONY: install-ingest-vectara
install-ingest-vectara:
python3 -m pip install -r requirements/ingest/vectara.txt
.PHONY: install-ingest-qdrant
install-ingest-qdrant:
python3 -m pip install -r requirements/ingest/qdrant.txt

View File

@ -23,5 +23,6 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/qdrant
destination_connectors/s3
destination_connectors/sql
destination_connectors/vectara
destination_connectors/weaviate

View File

@ -3,7 +3,7 @@
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1p.txt \
--output-dir local-to-pinecone \
--output-dir local-to-chroma \
--strategy fast \
--chunk-elements \
--embedding-provider "<unstructured embedding provider, ie. langchain-huggingface>" \

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1225p.txt \
--output-dir local-output-to-vectara \
--strategy fast \
--chunk-elements \
--num-processes 2 \
--verbose \
vectara \
--customer-id "$VECTARA_CUSTOMER_ID" \
--oauth-client-id "$VECTARA_OAUTH_CLIENT_ID" \
--oauth-secret "$VECTARA_OAUTH_SECRET" \
--corpus-name "test-corpus-vectara"

View File

@ -0,0 +1,52 @@
import os
from unstructured.ingest.connector.local import SimpleLocalConfig
from unstructured.ingest.connector.vectara import (
SimpleVectaraConfig,
VectaraAccessConfig,
WriteConfig,
)
from unstructured.ingest.interfaces import (
PartitionConfig,
ProcessorConfig,
ReadConfig,
)
from unstructured.ingest.runner import LocalRunner
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.vectara import (
VectaraWriter,
)
def get_writer() -> Writer:
return VectaraWriter(
connector_config=SimpleVectaraConfig(
access_config=VectaraAccessConfig(
oauth_client_id=os.getenv("VECTARA_OAUTH_CLIENT_ID"),
oauth_secret=os.getenv("VECTARA_OAUTH_SECRET"),
),
customer_id=os.getenv("VECTARA_CUSTOMER_ID"),
corpus_name="test-corpus-vectara",
),
write_config=WriteConfig(),
)
if __name__ == "__main__":
writer = get_writer()
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-vectara",
num_processes=2,
),
connector_config=SimpleLocalConfig(
input_path="example-docs/book-war-and-peace-1225p.txt",
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
# chunking_config=ChunkingConfig(chunk_elements=True),
writer=writer,
writer_kwargs={},
)
runner.run()

View File

@ -0,0 +1,27 @@
Vectara
===========
Process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those to a Vectara corpus.
If you don't yet have a Vectara account, [sign up](https://vectara.com/integrations/unstructured/) for your account.
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
.. tabs::
.. tab:: Shell
.. literalinclude:: ./code/bash/vectara.sh
:language: bash
.. tab:: Python
.. literalinclude:: ./code/python/vectara.py
:language: python
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> vectara --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Processes example-docs/book-war-and-peace-1p.txt/,
# Ingests into Vectara
# Structured outputs are stored in s3-small-batch-output-to-vectara/
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/../../.. || exit 1
# As an example we're using the s3 source connector,
# however ingesting from any supported source connector is possible.
# shellcheck disable=2094
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--input-path example-docs/book-war-and-peace-1p.txt \
--output-dir local-to-vectara \
--strategy fast \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
vectara \
--oauth-client-id "<Vectara OAUTH2 client ID" \
--oauth-secret "<Vectara OAUTH2 Secret" \
--customer-id "<Vectara customer id" \
--corpus-name "<Vectara corpus name>"

View File

@ -160,6 +160,7 @@ setup(
"salesforce": load_requirements("requirements/ingest/salesforce.in"),
"sftp": load_requirements("requirements/ingest/sftp.in"),
"slack": load_requirements("requirements/ingest/slack.in"),
"vectara": load_requirements("requirements/ingest/vectara.in"),
"wikipedia": load_requirements("requirements/ingest/wikipedia.in"),
"weaviate": load_requirements("requirements/ingest/weaviate.in"),
# Legacy extra requirements

View File

@ -0,0 +1,93 @@
#!/usr/bin/env bash
set -e
DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=local-vectara-dest
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
RANDOM_SUFFIX=$((RANDOM % 100000 + 1))
CORPUS_NAME="test-corpus-vectara-"$RANDOM_SUFFIX
# Expected size of the uploaded document
EXPECTED_CORPUS_SIZE=8829936
if [ -z "$VECTARA_OAUTH_CLIENT_ID" ] && [ -z "$VECTARA_OAUTH_SECRET" ] && [ -z "$VECTARA_CUSTOMER_ID" ]; then
echo "Skipping VECTARA ingest test because VECTARA_OAUTH_CLIENT_ID, VECTARA_OAUTH_SECRET, or VECTARA_CUSTOMER_ID env var is not set."
exit 8
fi
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
echo "Deleting corpus $corpus_id ($CORPUS_NAME)"
curl -sS -L -X POST 'https://api.vectara.io/v1/delete-corpus' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-H "Authorization: Bearer $access_token" \
-H "customer-id: $VECTARA_CUSTOMER_ID" \
--data-raw "{
\"corpusId\": $corpus_id
}"
# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"
}
trap cleanup EXIT
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir "$WORK_DIR" \
vectara \
--customer-id "$VECTARA_CUSTOMER_ID" \
--oauth-client-id "$VECTARA_OAUTH_CLIENT_ID" \
--oauth-secret "$VECTARA_OAUTH_SECRET" \
--corpus-name "$CORPUS_NAME"
# Get JWT token
jwt_token_resp=$(curl -sS -XPOST -H "Content-type: application/x-www-form-urlencoded" -d \
"grant_type=client_credentials&client_id=$VECTARA_OAUTH_CLIENT_ID&client_secret=$VECTARA_OAUTH_SECRET" \
"https://vectara-prod-$VECTARA_CUSTOMER_ID.auth.us-west-2.amazoncognito.com/oauth2/token")
access_token=$(echo "$jwt_token_resp" | jq -r '.access_token')
# Get corpus ID from name
corpora_resp=$(curl -sS -L -X POST 'https://api.vectara.io/v1/list-corpora' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-H "customer-id: $VECTARA_CUSTOMER_ID" \
-H "Authorization: Bearer $access_token" \
--data-raw "{
\"numResults\": 100,
\"filter\": \"$CORPUS_NAME\"
}")
corpus_id=$(echo "$corpora_resp" | jq -r '.corpus[0].id')
# Check that the size of the corpus is as expected
get_corpus_size=$(curl -L -X POST 'https://api.vectara.io/v1/compute-corpus-size' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-H "customer-id: $VECTARA_CUSTOMER_ID" \
-H "Authorization: Bearer $access_token" \
--data-raw "{
\"corpusId\": $corpus_id
}")
corpus_size=$(echo "$get_corpus_size" | jq -r '.size.size')
if [ "$corpus_size" == "$EXPECTED_CORPUS_SIZE" ]; then
echo "Corpus size is as expected: $corpus_size"
else
echo "Corpus size is not as expected: $corpus_size"
exit 1
fi

View File

@ -42,7 +42,7 @@ PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \
--input-path "example-docs/$TEST_FILE_NAME" \
--work-dir "$WORK_DIR"
RESULT_FILE_PATH="$OUTPUT_DIR/example-docs/$TEST_FILE_NAME.json"
RESULT_FILE_PATH="$OUTPUT_DIR/$TEST_FILE_NAME.json"
# validate that there is at least one table with text_as_html in the results
if [ "$(jq 'any(.[]; .metadata.text_as_html != null)' "$RESULT_FILE_PATH")" = "false" ]; then
echo "No table with text_as_html found in $RESULT_FILE_PATH but at least one was expected."

View File

@ -30,6 +30,7 @@ all_tests=(
's3.sh'
'sharepoint-embed-cog-index.sh'
'sqlite.sh'
'vectara.sh'
'weaviate.sh'
'opensearch.sh'
)

View File

@ -1 +1 @@
__version__ = "0.12.4-dev3" # pragma: no cover
__version__ = "0.12.4-dev4" # pragma: no cover

View File

@ -49,6 +49,7 @@ from .salesforce import get_base_src_cmd as salesforce_base_src_cmd
from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd
from .slack import get_base_src_cmd as slack_base_src_cmd
from .sql import get_base_dest_cmd as sql_base_dest_cmd
from .vectara import get_base_dest_cmd as vectara_base_dest_cmd
from .weaviate import get_base_dest_cmd as weaviate_dest_cmd
from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd
@ -115,6 +116,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
pinecone_base_dest_cmd,
qdrant_base_dest_cmd,
opensearch_base_dest_cmd,
vectara_base_dest_cmd,
]
# Make sure there are not overlapping names

View File

@ -0,0 +1,66 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import CliConfig
from unstructured.ingest.connector.vectara import SimpleVectaraConfig, WriteConfig
@dataclass
class VectaraCliWriteConfig(SimpleVectaraConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--customer-id"],
required=True,
type=str,
help="The Vectara customer-id.",
envvar="VECTARA_CUSTOMER_ID",
show_envvar=True,
),
click.Option(
["--oauth-client-id"],
required=True,
type=str,
help="Vectara OAuth2 client ID.",
envvar="VECTARA_OAUTH_CLIENT_ID",
show_envvar=True,
),
click.Option(
["--oauth-secret"],
required=True,
type=str,
help="Vectara OAuth2 secret.",
envvar="VECTARA_OAUTH_SECRET",
show_envvar=True,
),
click.Option(
["--corpus-name"],
required=False,
type=str,
default=None,
help="The Vectara corpus-name.",
),
click.Option(
["--token-url"],
required=False,
default="https://vectara-prod-{}.auth.us-west-2.amazoncognito.com/oauth2/token",
type=str,
help="The Vectara endpoint for token refresh. Needs curly brackets for customer_id",
),
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name="vectara",
cli_config=VectaraCliWriteConfig,
additional_cli_options=[],
write_config=WriteConfig,
)
return cmd_cls

View File

@ -0,0 +1,248 @@
import datetime
import json
import typing as t
import uuid
from dataclasses import dataclass, field
import requests
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.staging.base import flatten_dict
BASE_URL = "https://api.vectara.io/v1"
@dataclass
class VectaraAccessConfig(AccessConfig):
oauth_client_id: str = enhanced_field(sensitive=True)
oauth_secret: str = enhanced_field(sensitive=True)
@dataclass
class SimpleVectaraConfig(BaseConnectorConfig):
access_config: VectaraAccessConfig
customer_id: str
corpus_name: t.Optional[str] = None
corpus_id: t.Optional[str] = None
token_url: str = "https://vectara-prod-{}.auth.us-west-2.amazoncognito.com/oauth2/token"
@dataclass
class VectaraDestinationConnector(BaseDestinationConnector):
write_config: WriteConfig
connector_config: SimpleVectaraConfig
_jwt_token: t.Optional[str] = field(init=False, default=None)
_jwt_token_expires_ts: t.Optional[float] = field(init=False, default=None)
@property
def jwt_token(self):
if (
not self._jwt_token
or self._jwt_token_expires_ts - datetime.datetime.now().timestamp() <= 60
):
self._jwt_token = self._get_jwt_token()
return self._jwt_token
@DestinationConnectionError.wrap
def vectara(self):
"""
Check the connection for Vectara and validate corpus exists.
- If more than one corpus with the same name exists - then return a message
- If exactly one corpus exists with this name - use it.
- If does not exist - create it.
"""
try:
# Get token if not already set
self.jwt_token
list_corpora_response = self._request(
endpoint="list-corpora",
data={"numResults": 1, "filter": self.connector_config.corpus_name},
)
possible_corpora_ids_names_map = {
corpus.get("id"): corpus.get("name")
for corpus in list_corpora_response.get("corpus")
if corpus.get("name") == self.connector_config.corpus_name
}
if len(possible_corpora_ids_names_map) > 1:
return f"Multiple Corpora exist with name {self.connector_config.corpus_name}"
if len(possible_corpora_ids_names_map) == 1:
self.connector_config.corpus_id = list(possible_corpora_ids_names_map.keys())[0]
else:
data = {
"corpus": {
"name": self.connector_config.corpus_name,
}
}
create_corpus_response = self._request(endpoint="create-corpus", data=data)
self.connector_config.corpus_id = create_corpus_response.get("corpusId")
except Exception as e:
logger.error(f"failed to create Vectara connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to create Vectara connection: {e}")
def initialize(self):
self.vectara()
def _request(
self,
endpoint: str,
http_method: str = "POST",
params: t.Mapping[str, t.Any] = None,
data: t.Mapping[str, t.Any] = None,
):
url = f"{BASE_URL}/{endpoint}"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {self.jwt_token}",
"customer-id": self.connector_config.customer_id,
"X-source": "unstructured",
}
response = requests.request(
method=http_method, url=url, headers=headers, params=params, data=json.dumps(data)
)
response.raise_for_status()
return response.json()
# Get Oauth2 JWT token
def _get_jwt_token(self):
"""Connect to the server and get a JWT token."""
token_endpoint = self.connector_config.token_url.format(self.connector_config.customer_id)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"grant_type": "client_credentials",
"client_id": self.connector_config.access_config.oauth_client_id,
"client_secret": self.connector_config.access_config.oauth_secret,
}
response = requests.request(method="POST", url=token_endpoint, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()
request_time = datetime.datetime.now().timestamp()
self._jwt_token_expires_ts = request_time + response_json.get("expires_in")
return response_json.get("access_token")
@DestinationConnectionError.wrap
def check_connection(self):
try:
self.vectara()
except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to validate connection: {e}")
def _delete_doc(self, doc_id: str) -> None:
"""
Delete a document from the Vectara corpus.
Args:
url (str): URL of the page to delete.
doc_id (str): ID of the document to delete.
"""
body = {
"customer_id": self.connector_config.customer_id,
"corpus_id": self.connector_config.corpus_id,
"document_id": doc_id,
}
self._request(endpoint="delete-doc", data=body)
def _index_document(self, document: t.Dict[str, t.Any]) -> None:
"""
Index a document (by uploading it to the Vectara corpus) from the document dictionary
"""
body = {
"customer_id": self.connector_config.customer_id,
"corpus_id": self.connector_config.corpus_id,
"document": document,
}
try:
result = self._request(endpoint="index", data=body, http_method="POST")
except Exception as e:
logger.info(f"Exception {e} while indexing document {document['documentId']}")
return
if (
"status" in result
and result["status"]
and (
"ALREADY_EXISTS" in result["status"]["code"]
or (
"CONFLICT" in result["status"]["code"]
and "Indexing doesn't support updating documents"
in result["status"]["statusDetail"]
)
)
):
logger.info(f"Document {document['documentId']} already exists, re-indexing")
self._delete_doc(document["documentId"])
result = self._request(endpoint="index", data=body, http_method="POST")
return
if "status" in result and result["status"] and "OK" in result["status"]["code"]:
logger.info(f"Indexing document {document['documentId']} succeeded")
else:
logger.info(f"Indexing document {document['documentId']} failed, response = {result}")
def write_dict(self, *args, docs_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(f"Inserting / updating {len(docs_list)} documents to Vectara ")
for vdoc in docs_list:
self._index_document(vdoc)
def write(self, docs: t.List[BaseIngestDoc]) -> None:
docs_list: t.Dict[t.Dict[str, t.Any]] = []
def get_metadata(element) -> t.Dict[str, t.Any]:
"""
Select which meta-data fields to include and optionaly map them to a new new.
remove the "metadata-" prefix from the keys
"""
metadata_map = {
"page_number": "page_number",
"data_source-url": "url",
"filename": "filename",
"filetype": "filetype",
"last_modified": "last_modified",
}
md = flatten_dict(element, separator="-", flatten_lists=True)
md = {k.replace("metadata-", ""): v for k, v in md.items()}
md = {metadata_map[k]: v for k, v in md.items() if k in metadata_map}
return md
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
dict_content = json.load(json_file)
[e["type"] for e in dict_content]
vdoc = {
"documentId": str(uuid.uuid4()),
"section": [
{
"text": element.pop("text", None),
"metadataJson": json.dumps(get_metadata(element)),
}
for element in dict_content
],
}
logger.info(
f"Extending {len(vdoc)} json elements from content in {local_path}",
)
docs_list.append(vdoc)
self.write_dict(docs_list=docs_list)

View File

@ -11,7 +11,7 @@ def default_is_data_sensitive(k: str, v: t.Any) -> bool:
"account_name",
"client_id",
]
sensitive_triggers = ["key", "cred", "token", "password"]
sensitive_triggers = ["key", "cred", "token", "password", "oauth", "secret"]
return (
v
and any([s in k.lower() for s in sensitive_triggers]) # noqa: C419

View File

@ -213,7 +213,7 @@ class WriteNode(PipelineNode):
def initialize(self):
logger.info(
f"Running write node to upload content. "
f"Destination connector: {self.dest_doc_connector.to_json()}]",
f"Destination connector: {self.dest_doc_connector.to_json(redact_sensitive=True)}]",
)
super().initialize()
self.dest_doc_connector.initialize()

View File

@ -16,6 +16,7 @@ from .opensearch import OpenSearchWriter
from .pinecone import PineconeWriter
from .qdrant import QdrantWriter
from .sql import SqlWriter
from .vectara import VectaraWriter
from .weaviate import WeaviateWriter
writer_map: t.Dict[str, t.Type[Writer]] = {
@ -34,6 +35,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = {
"qdrant": QdrantWriter,
"s3": S3Writer,
"sql": SqlWriter,
"vectara": VectaraWriter,
"weaviate": WeaviateWriter,
}

View File

@ -0,0 +1,22 @@
import typing as t
from dataclasses import dataclass
from unstructured.ingest.enhanced_dataclass import EnhancedDataClassJsonMixin
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.ingest.runner.writers.base_writer import Writer
if t.TYPE_CHECKING:
from unstructured.ingest.connector.vectara import SimpleVectaraConfig, VectaraWriteConfig
@dataclass
class VectaraWriter(Writer, EnhancedDataClassJsonMixin):
write_config: "VectaraWriteConfig"
connector_config: "SimpleVectaraConfig"
def get_connector_cls(self) -> t.Type[BaseDestinationConnector]:
from unstructured.ingest.connector.vectara import (
VectaraDestinationConnector,
)
return VectaraDestinationConnector