feat: add Astra source connector (#3304)

Thanks to @erichare we now have an AstraDB source connector.

updating constant names to be more aligned with AstraDB
This commit is contained in:
David Potter 2024-07-10 13:29:22 -07:00 committed by GitHub
parent 0c562d8050
commit 6c78677ebb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 724 additions and 19 deletions

View File

@ -346,6 +346,8 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OCTOAI_API_KEY: ${{ secrets.OCTOAI_API_KEY }}
PINECONE_API_KEY: ${{secrets.PINECONE_API_KEY}}
ASTRA_DB_APPLICATION_TOKEN: ${{secrets.ASTRA_DB_TOKEN}}
ASTRA_DB_API_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}}
TABLE_OCR: "tesseract"
OCR_AGENT: "unstructured.partition.utils.ocr_models.tesseract_ocr.OCRAgentTesseract"
CI: "true"
@ -411,8 +413,8 @@ jobs:
VECTARA_OAUTH_CLIENT_ID: ${{secrets.VECTARA_OAUTH_CLIENT_ID}}
VECTARA_OAUTH_SECRET: ${{secrets.VECTARA_OAUTH_SECRET}}
VECTARA_CUSTOMER_ID: ${{secrets.VECTARA_CUSTOMER_ID}}
ASTRA_DB_TOKEN: ${{secrets.ASTRA_DB_TOKEN}}
ASTRA_DB_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}}
ASTRA_DB_APPLICATION_TOKEN: ${{secrets.ASTRA_DB_TOKEN}}
ASTRA_DB_API_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}}
CLARIFAI_API_KEY: ${{secrets.CLARIFAI_API_KEY}}
DATABRICKS_HOST: ${{secrets.DATABRICKS_HOST}}
DATABRICKS_USERNAME: ${{secrets.DATABRICKS_USERNAME}}

View File

@ -1,4 +1,4 @@
## 0.14.11-dev5
## 0.14.11-dev6
### Enhancements
@ -7,6 +7,8 @@
### Features
* **Add AstraDB source connector** Adds support for ingesting documents from AstraDB.
### Fixes
## 0.14.10

View File

@ -0,0 +1,6 @@
Unstructured Documentation
==========================
The Unstructured documentation page has moved! Check out our new and improved docs page at
`https://docs.unstructured.io <https://docs.unstructured.io>`_ to learn more about our
products and tools.

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Processes the Unstructured-IO/unstructured repository
# through Unstructured's library in 2 processes.
# Structured outputs are stored in astra-ingest-output/
# NOTE, this script is not ready-to-run!
# You must enter a token, endpoint and collection name
# before running.
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
astra \
--token "<AstraDB Application Token>" \
--api-endpoint "<AstraDB Api Endpoint>" \
--collection-name "<AstraDB Collection Name>" \
--num-processes "2" \
--output-dir astra-ingest-output \
--verbose

View File

@ -10,13 +10,13 @@ OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
if [ -z "$ASTRA_DB_TOKEN" ]; then
echo "Skipping Astra DB ingest test because ASTRA_DB_TOKEN env var is not set."
if [ -z "$ASTRA_DB_APPLICATION_TOKEN" ]; then
echo "Skipping Astra DB ingest test because ASTRA_DB_APPLICATION_TOKEN env var is not set."
exit 0
fi
if [ -z "$ASTRA_DB_ENDPOINT" ]; then
echo "Skipping Astra DB ingest test because ASTRA_DB_ENDPOINT env var is not set."
if [ -z "$ASTRA_DB_API_ENDPOINT" ]; then
echo "Skipping Astra DB ingest test because ASTRA_DB_API_ENDPOINT env var is not set."
exit 0
fi
@ -32,8 +32,8 @@ function cleanup() {
cleanup_dir "$WORK_DIR"
python "$SCRIPT_DIR"/python/test-ingest-astra-output.py \
--token "$ASTRA_DB_TOKEN" \
--api-endpoint "$ASTRA_DB_ENDPOINT" \
--token "$ASTRA_DB_APPLICATION_TOKEN" \
--api-endpoint "$ASTRA_DB_API_ENDPOINT" \
--collection-name "$COLLECTION_NAME" down
}
@ -52,13 +52,13 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
astra \
--token "$ASTRA_DB_TOKEN" \
--api-endpoint "$ASTRA_DB_ENDPOINT" \
--token "$ASTRA_DB_APPLICATION_TOKEN" \
--api-endpoint "$ASTRA_DB_API_ENDPOINT" \
--collection-name "$COLLECTION_NAME" \
--embedding-dimension "$EMBEDDING_DIMENSION" \
--requested-indexing-policy '{"deny": ["metadata"]}'
python "$SCRIPT_DIR"/python/test-ingest-astra-output.py \
--token "$ASTRA_DB_TOKEN" \
--api-endpoint "$ASTRA_DB_ENDPOINT" \
--token "$ASTRA_DB_APPLICATION_TOKEN" \
--api-endpoint "$ASTRA_DB_API_ENDPOINT" \
--collection-name "$COLLECTION_NAME" check

View File

@ -0,0 +1,98 @@
[
{
"element_id": "f0a4e037e95409782d80f79ab482e0a6",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "25b75f1d-a2ea-4c97-b75f-1da2eadc97f7",
"type": "UncategorizedText"
},
{
"element_id": "5b07d26fd8dfe0d1eed55ade1646d117",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "City Hunter: Shinjuku Private Eyes",
"type": "Title"
},
{
"element_id": "fa66f50dce49f55a8ee1a3b868660435",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2558908",
"type": "UncategorizedText"
},
{
"element_id": "04ec2a9e0508b18cd1e74299f663646e",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2019-02-14",
"type": "UncategorizedText"
},
{
"element_id": "614276b484bb8e257e9bd90610e1311b",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Matt Schley",
"type": "Title"
},
{
"element_id": "840d7108dfb83582914b422aafeb5656",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2.5/5",
"type": "UncategorizedText"
},
{
"element_id": "d252d04d0a940ef7fdadfeb802decdc6",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "rotten",
"type": "NarrativeText"
},
{
"element_id": "c451f38624b0cc6014e1a1ea0e006a88",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "The film's out-of-touch attempts at humor may find them hunting for the reason the franchise was so popular in the first place.",
"type": "NarrativeText"
}
]

View File

@ -0,0 +1,26 @@
[
{
"element_id": "92d743481b1262db7e93beb437b6c793",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "60297eea-73d7-4fca-a97e-ea73d7cfca62 City Hunter: Shinjuku Private Eyes 2590987 2019-05-28 Reuben Baron",
"type": "Title"
},
{
"element_id": "7abd0e1bf81ec2fec0917ceece253c4f",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "fresh The choreography is so precise and lifelike at points one might wonder whether the movie was rotoscoped, but no live-action reference footage was used. The quality is due to the skill of the animators and Kodama's love for professional wrestling.",
"type": "NarrativeText"
}
]

View File

@ -0,0 +1,98 @@
[
{
"element_id": "7c4c8a27f2664fc2db5eac50c9105299",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "641d99e3-9941-4c18-9d99-e399414c183d",
"type": "UncategorizedText"
},
{
"element_id": "d2674169f9d7a78a17b4cce81a30ab10",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Beavers",
"type": "Title"
},
{
"element_id": "93f68debb2a48075f5cb933213331e0a",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "1145982",
"type": "UncategorizedText"
},
{
"element_id": "602ff3900f9c245ffda521cb04dec673",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2003-05-23",
"type": "UncategorizedText"
},
{
"element_id": "8cf307b6a94e7532c00f4abeae2909fd",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Ivan M. Lincoln",
"type": "Title"
},
{
"element_id": "6d77cf17e6d39fc13bade34de54d6df0",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "3.5/4",
"type": "UncategorizedText"
},
{
"element_id": "eca56c8a2a202ccaac07fe0d807c92a6",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "fresh",
"type": "Title"
},
{
"element_id": "3e9ea85f9e12c9683dc6ad6fb3f58f1f",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Timed to be just long enough for most youngsters' brief attention spans -- and it's packed with plenty of interesting activity, both on land and under the water.",
"type": "NarrativeText"
}
]

View File

@ -0,0 +1,98 @@
[
{
"element_id": "6c97f8135d9625ab7e1b55045edfcbc7",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "762c0093-2277-4f3e-ac00-932277af3e0e",
"type": "UncategorizedText"
},
{
"element_id": "60f6854bdd5c1362123b707b3836ec5a",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Blood Mask",
"type": "Title"
},
{
"element_id": "459e0f0a9ce96831ae4f91ae912a5f25",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "1636744",
"type": "UncategorizedText"
},
{
"element_id": "9d81ee525c3fa60e79474d5159a9ac2f",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2007-06-02",
"type": "UncategorizedText"
},
{
"element_id": "4ca9dfe7de57c9d71f688687cccc36ec",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "The Foywonder",
"type": "Title"
},
{
"element_id": "db5c57ce2da3356bac0a200350e5fa99",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "1/5",
"type": "UncategorizedText"
},
{
"element_id": "4810fb7d8df8d65346f039b7ca93b70c",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "rotten",
"type": "NarrativeText"
},
{
"element_id": "8077ef7b087ff1f0c278bc1145868240",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "It doesn't matter if a movie costs 300 million or only 300 dollars; good is good and bad is bad, and Bloodmask: The Possession of Nicole Lameroux is just plain bad.",
"type": "NarrativeText"
}
]

View File

@ -0,0 +1,146 @@
[
{
"element_id": "92ace4ff9ad3621da892886eeee478a3",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "ae40df94",
"type": "Title"
},
{
"element_id": "047ae5e12572a60afb92f698bb8e6f66",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "0b3a",
"type": "Title"
},
{
"element_id": "31ef877bce804ebf1a15891edc11e2c7",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "4f89",
"type": "UncategorizedText"
},
{
"element_id": "615b321c09ab3067ff128b2e4bcdcdf2",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "80df",
"type": "Title"
},
{
"element_id": "1cb8aeff17aa47f2ef832d0f49165777",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "940b3a6f8966",
"type": "UncategorizedText"
},
{
"element_id": "80d611a9e1670f65498590c4c3b33233",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Dangerous Men",
"type": "Title"
},
{
"element_id": "2706f30db2cf0f66f927c9c536ac2518",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2504681",
"type": "UncategorizedText"
},
{
"element_id": "a2346ae79f2c5908ef90122d2f703687",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "2018",
"type": "UncategorizedText"
},
{
"element_id": "016c8949614d0fec6481738d0c8b45a5",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "08",
"type": "UncategorizedText"
},
{
"element_id": "b963d1cd3855ec6d3a2c993304aee5f7",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "29",
"type": "UncategorizedText"
},
{
"element_id": "06b0f5c271b3cb3d036fd89bd8979b38",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "Pat Padua",
"type": "Title"
},
{
"element_id": "5f75100c8d5f9fee4007fd126d3c88f1",
"metadata": {
"data_source": {},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "fresh Its clumsy determination is endearing and sometimes wildly entertaining",
"type": "NarrativeText"
}
]

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
set -e
SRC_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$SRC_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=astra
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
if [ -z "$ASTRA_DB_APPLICATION_TOKEN" ]; then
echo "Skipping Astra DB source test because ASTRA_DB_APPLICATION_TOKEN env var is not set."
exit 0
fi
if [ -z "$ASTRA_DB_API_ENDPOINT" ]; then
echo "Skipping Astra DB source test because ASTRA_DB_API_ENDPOINT env var is not set."
exit 0
fi
COLLECTION_NAME="ingest_test_src"
PYTHONPATH=. ./unstructured/ingest/main.py \
astra \
--token "$ASTRA_DB_APPLICATION_TOKEN" \
--api-endpoint "$ASTRA_DB_API_ENDPOINT" \
--collection-name "$COLLECTION_NAME" \
--download-dir "$DOWNLOAD_DIR" \
--metadata-exclude coordinates,filename,file_directory,metadata.last_modified,metadata.data_source.date_processed,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \
--num-processes "$max_processes" \
--strategy hi_res \
--preserve-downloads \
--reprocess \
--output-dir "$OUTPUT_DIR" \
--verbose \
--work-dir "$WORK_DIR"
"$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME

View File

@ -19,6 +19,7 @@ export OMP_THREAD_LIMIT=1
all_tests=(
's3.sh'
's3-minio.sh'
'astra.sh'
'azure.sh'
'biomed-api.sh'
'biomed-path.sh'

View File

@ -1 +1 @@
__version__ = "0.14.11-dev5" # pragma: no cover
__version__ = "0.14.11-dev6" # pragma: no cover

View File

@ -8,6 +8,7 @@ from unstructured.ingest.cli.cmds.fsspec.sftp import get_base_src_cmd as sftp_ba
from .airtable import get_base_src_cmd as airtable_base_src_cmd
from .astra import get_base_dest_cmd as astra_base_dest_cmd
from .astra import get_base_src_cmd as astra_base_src_cmd
from .azure_cognitive_search import get_base_dest_cmd as azure_cognitive_search_base_dest_cmd
from .biomed import get_base_src_cmd as biomed_base_src_cmd
from .chroma import get_base_dest_cmd as chroma_base_dest_cmd
@ -62,6 +63,7 @@ if t.TYPE_CHECKING:
base_src_cmd_fns: t.List[t.Callable[[], BaseSrcCmd]] = [
airtable_base_src_cmd,
astra_base_src_cmd,
azure_base_src_cmd,
biomed_base_src_cmd,
box_base_src_cmd,

View File

@ -17,7 +17,7 @@ class AstraCliConfig(SimpleAstraConfig, CliConfig):
required=True,
type=str,
help="Astra DB Token with access to the database.",
envvar="ASTRA_DB_TOKEN",
envvar="ASTRA_DB_APPLICATION_TOKEN",
show_envvar=True,
),
click.Option(
@ -25,7 +25,7 @@ class AstraCliConfig(SimpleAstraConfig, CliConfig):
required=True,
type=str,
help="The API endpoint for the Astra DB.",
envvar="ASTRA_DB_ENDPOINT",
envvar="ASTRA_DB_API_ENDPOINT",
show_envvar=True,
),
click.Option(
@ -77,6 +77,16 @@ class AstraCliWriteConfig(AstraWriteConfig, CliConfig):
return options
def get_base_src_cmd():
from unstructured.ingest.cli.base.src import BaseSrcCmd
cmd_cls = BaseSrcCmd(
cmd_name="astra",
cli_config=AstraCliConfig,
)
return cmd_cls
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd

View File

@ -1,20 +1,27 @@
import copy
import typing as t
from dataclasses import dataclass, field
from pathlib import Path
from unstructured import __name__ as integration_name
from unstructured.__version__ import __version__ as integration_version
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.enhanced_dataclass.core import _asdict
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseSingleIngestDoc,
BaseSourceConnector,
IngestDocCleanupMixin,
SourceConnectorCleanupMixin,
SourceMetadata,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import batch_generator
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies
if t.TYPE_CHECKING:
@ -38,6 +45,114 @@ class SimpleAstraConfig(BaseConnectorConfig):
requested_indexing_policy: t.Optional[t.Dict[str, t.Any]] = None
@dataclass
class AstraIngestDoc(IngestDocCleanupMixin, BaseSingleIngestDoc):
connector_config: SimpleAstraConfig
metadata: t.Dict[str, str] = field(default_factory=dict)
registry_name: str = "astra"
@property
def filename(self):
return (
Path(self.read_config.download_dir)
/ self.connector_config.collection_name
/ f"{self.metadata['_id']}.txt"
).resolve()
@property
def _output_filename(self):
return (
Path(self.processor_config.output_dir)
/ self.connector_config.collection_name
/ f"{self.metadata['_id']}.json"
).resolve()
def update_source_metadata(self, **kwargs):
if not self.metadata:
self.source_metadata = SourceMetadata(
exists=False,
)
return
self.source_metadata = SourceMetadata(
exists=True,
)
@SourceConnectionError.wrap
@requires_dependencies(["astrapy"], extras="astra")
@BaseSingleIngestDoc.skip_if_file_exists
def get_file(self):
self.filename.parent.mkdir(parents=True, exist_ok=True)
flattened_dict = flatten_dict(dictionary=self.metadata)
str_values = [str(value) for value in flattened_dict.values()]
concatenated_values = "\n".join(str_values)
with open(self.filename, "w") as f:
f.write(concatenated_values)
@dataclass
class AstraSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector):
connector_config: SimpleAstraConfig
_astra_db: t.Optional["AstraDB"] = field(init=False, default=None)
_astra_db_collection: t.Optional["AstraDBCollection"] = field(init=False, default=None)
@property
@requires_dependencies(["astrapy"], extras="astra")
def astra_db_collection(self) -> "AstraDBCollection":
if self._astra_db_collection is None:
from astrapy.db import AstraDB
# Build the Astra DB object.
# caller_name/version for AstraDB tracking
self._astra_db = AstraDB(
api_endpoint=self.connector_config.access_config.api_endpoint,
token=self.connector_config.access_config.token,
namespace=self.connector_config.namespace,
caller_name=integration_name,
caller_version=integration_version,
)
# Create and connect to the collection
self._astra_db_collection = self._astra_db.collection(
collection_name=self.connector_config.collection_name,
)
return self._astra_db_collection # type: ignore
@requires_dependencies(["astrapy"], extras="astra")
@SourceConnectionError.wrap # type: ignore
def initialize(self):
_ = self.astra_db_collection
@requires_dependencies(["astrapy"], extras="astra")
def check_connection(self):
try:
_ = self.astra_db_collection
except Exception as e:
logger.error(f"Failed to validate connection {e}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {e}")
@requires_dependencies(["astrapy"], extras="astra")
def get_ingest_docs(self): # type: ignore
# Perform the find operation
astra_docs = list(self.astra_db_collection.paginated_find())
doc_list = []
for record in astra_docs:
doc = AstraIngestDoc(
connector_config=self.connector_config,
processor_config=self.processor_config,
read_config=self.read_config,
metadata=record,
)
doc.update_source_metadata()
doc_list.append(doc)
return doc_list
@dataclass
class AstraWriteConfig(WriteConfig):
batch_size: int = 20
@ -114,8 +229,8 @@ class AstraDestinationConnector(BaseDestinationConnector):
astra_batch_size = self.write_config.batch_size
for chunk in batch_generator(elements_dict, astra_batch_size):
self._astra_db_collection.insert_many(chunk)
for batch in batch_generator(elements_dict, astra_batch_size):
self._astra_db_collection.insert_many(batch)
def normalize_dict(self, element_dict: dict) -> dict:
return {

View File

@ -2,6 +2,7 @@ import json
from typing import Dict, Type, cast
from unstructured.ingest.connector.airtable import AirtableIngestDoc
from unstructured.ingest.connector.astra import AstraIngestDoc
from unstructured.ingest.connector.biomed import BiomedIngestDoc
from unstructured.ingest.connector.confluence import ConfluenceIngestDoc
from unstructured.ingest.connector.delta_table import DeltaTableIngestDoc
@ -45,6 +46,7 @@ from unstructured.ingest.interfaces import BaseIngestDoc
INGEST_DOC_NAME_TO_CLASS: Dict[str, Type[EnhancedDataClassJsonMixin]] = {
"airtable": AirtableIngestDoc,
"astra": AstraIngestDoc,
"azure": AzureBlobStorageIngestDoc,
"biomed": BiomedIngestDoc,
"box": BoxIngestDoc,

View File

@ -2,6 +2,7 @@ import typing as t
from typing import Type
from .airtable import AirtableRunner
from .astra import AstraRunner
from .base_runner import Runner
from .biomed import BiomedRunner
from .confluence import ConfluenceRunner
@ -35,6 +36,7 @@ from .wikipedia import WikipediaRunner
runner_map: t.Dict[str, Type[Runner]] = {
"airtable": AirtableRunner,
"astra": AstraRunner,
"azure": AzureRunner,
"biomed": BiomedRunner,
"box": BoxRunner,
@ -69,6 +71,7 @@ runner_map: t.Dict[str, Type[Runner]] = {
__all__ = [
"AirtableRunner",
"AstraRunner",
"AzureRunner",
"BiomedRunner",
"BoxRunner",

View File

@ -0,0 +1,34 @@
import hashlib
import typing as t
from dataclasses import dataclass
from unstructured.ingest.interfaces import BaseSourceConnector
from unstructured.ingest.logger import logger
from unstructured.ingest.runner.base_runner import Runner
from unstructured.ingest.runner.utils import update_download_dir_hash
if t.TYPE_CHECKING:
from unstructured.ingest.connector.astra import SimpleAstraConfig
@dataclass
class AstraRunner(Runner):
connector_config: "SimpleAstraConfig"
def update_read_config(self):
hashed_dir_name = hashlib.sha256(
str(self.connector_config.access_config.api_endpoint).encode("utf-8"),
)
self.read_config.download_dir = update_download_dir_hash(
connector_name="astra",
read_config=self.read_config,
hashed_dir_name=hashed_dir_name,
logger=logger,
)
def get_source_connector_cls(self) -> t.Type[BaseSourceConnector]:
from unstructured.ingest.connector.astra import (
AstraSourceConnector,
)
return AstraSourceConnector