rfctr: chroma destination migrated to V2 (#3214)

Moving Chroma destination to the V2 version of connectors.
This commit is contained in:
David Potter 2024-06-22 17:19:29 -07:00 committed by GitHub
parent 8610bd3ab9
commit 88b08a734d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 495 additions and 0 deletions

View File

@ -2,6 +2,7 @@ from collections import Counter
import click
from .chroma import chroma_dest_cmd
from .elasticsearch import elasticsearch_dest_cmd, elasticsearch_src_cmd
from .fsspec.azure import azure_dest_cmd, azure_src_cmd
from .fsspec.box import box_dest_cmd, box_src_cmd
@ -35,6 +36,7 @@ if duplicate_src_names:
dest_cmds = [
azure_dest_cmd,
box_dest_cmd,
chroma_dest_cmd,
dropbox_dest_cmd,
elasticsearch_dest_cmd,
gcs_dest_cmd,

View File

@ -0,0 +1,108 @@
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import Dict
from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.chroma import CONNECTOR_TYPE
@dataclass
class ChromaCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--path"],
required=False,
type=str,
help="Location where Chroma is persisted," "if not connecting via http.",
),
click.Option(
["--settings"],
required=False,
type=Dict(),
help="A dictionary of settings to communicate with the chroma server."
'example: \'{"persist_directory":"./chroma-persist"}\' ',
),
click.Option(
["--tenant"],
required=False,
default="default_tenant",
type=str,
help="The tenant to use for this client. Chroma defaults to 'default_tenant'.",
),
click.Option(
["--database"],
required=False,
default="default_database",
type=str,
help="The database to use for this client."
"Chroma defaults to 'default_database'.",
),
click.Option(
["--host"],
required=False,
type=str,
help="The hostname of the Chroma server.",
),
click.Option(
["--port"],
required=False,
type=int,
help="The port of the Chroma server.",
),
click.Option(
["--ssl"],
required=False,
default=False,
is_flag=True,
type=bool,
help="Whether to use SSL to connect to the Chroma server.",
),
click.Option(
["--headers"],
required=False,
type=Dict(),
help="A dictionary of headers to send to the Chroma server."
'example: \'{"Authorization":"Basic()"}\' ',
),
click.Option(
["--collection-name"],
required=True,
type=str,
help="The name of the Chroma collection to write into.",
),
]
return options
@dataclass
class ChromaCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--batch-size"],
default=100,
type=int,
help="Number of records per batch",
)
]
return options
@dataclass
class ChromaCliUploadStagerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
return []
chroma_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=ChromaCliConnectionConfig,
uploader_config=ChromaCliUploaderConfig,
upload_stager_config=ChromaCliUploadStagerConfig,
)

View File

@ -0,0 +1,53 @@
import random
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.chroma import (
ChromaAccessConfig,
ChromaConnectionConfig,
ChromaUploaderConfig,
ChromaUploadStagerConfig,
)
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=ChromaConnectionConfig(
access_config=ChromaAccessConfig(settings=None, headers=None),
host="localhost",
port=8047,
collection_name=f"test-collection-{random.randint(1000,9999)}",
tenant="default_tenant",
database="default_database",
),
stager_config=ChromaUploadStagerConfig(),
uploader_config=ChromaUploaderConfig(batch_size=10),
).run()

View File

@ -0,0 +1,35 @@
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
LocalUploaderConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(
input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt"
),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
uploader_config=LocalUploaderConfig(output_dir=str(output_path.resolve())),
).run()

View File

@ -0,0 +1,35 @@
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.fsspec.s3 import (
S3ConnectionConfig,
S3DownloaderConfig,
S3IndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.local import (
LocalUploaderConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/"),
downloader_config=S3DownloaderConfig(download_dir=download_path),
source_connection_config=S3ConnectionConfig(anonymous=True),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
uploader_config=LocalUploaderConfig(output_dir=str(output_path.resolve())),
).run()

View File

@ -0,0 +1,44 @@
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.weaviate import (
WeaviateConnectionConfig,
WeaviateUploaderConfig,
WeaviateUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=WeaviateConnectionConfig(
host_url="http://localhost:8080",
class_name="elements",
access_config=None,
anonymous=True,
),
stager_config=WeaviateUploadStagerConfig(),
uploader_config=WeaviateUploaderConfig(batch_size=10),
).run()

View File

@ -0,0 +1,217 @@
import json
import uuid
from dataclasses import dataclass, field
from datetime import date, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
from dateutil import parser
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
FileData,
UploadContent,
Uploader,
UploaderConfig,
UploadStager,
UploadStagerConfig,
)
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
add_destination_entry,
)
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies
if TYPE_CHECKING:
from chromadb import Client
import typing as t
CONNECTOR_TYPE = "chroma"
@dataclass
class ChromaAccessConfig(AccessConfig):
settings: Optional[Dict[str, str]] = None
headers: Optional[Dict[str, str]] = None
@dataclass
class ChromaConnectionConfig(ConnectionConfig):
collection_name: str
access_config: ChromaAccessConfig = enhanced_field(sensitive=True)
path: Optional[str] = None
tenant: Optional[str] = "default_tenant"
database: Optional[str] = "default_database"
host: Optional[str] = None
port: Optional[int] = None
ssl: bool = False
connector_type: str = CONNECTOR_TYPE
@dataclass
class ChromaUploadStagerConfig(UploadStagerConfig):
pass
@dataclass
class ChromaUploadStager(UploadStager):
upload_stager_config: ChromaUploadStagerConfig = field(
default_factory=lambda: ChromaUploadStagerConfig()
)
@staticmethod
def parse_date_string(date_string: str) -> date:
try:
timestamp = float(date_string)
return datetime.fromtimestamp(timestamp)
except Exception as e:
logger.debug(f"date {date_string} string not a timestamp: {e}")
return parser.parse(date_string)
@classmethod
def conform_dict(cls, data: dict) -> dict:
"""
Prepares dictionary in the format that Chroma requires
"""
element_id = data.get("element_id", str(uuid.uuid4()))
return {
"id": element_id,
"embedding": data.pop("embeddings", None),
"document": data.pop("text", None),
"metadata": flatten_dict(data, separator="-", flatten_lists=True, remove_none=True),
}
def run(
self,
elements_filepath: Path,
file_data: FileData,
output_dir: Path,
output_filename: str,
**kwargs: Any,
) -> Path:
with open(elements_filepath) as elements_file:
elements_contents = json.load(elements_file)
conformed_elements = []
for element in elements_contents:
conformed_elements.append(self.conform_dict(data=element))
output_path = Path(output_dir) / Path(f"{output_filename}.json")
with open(output_path, "w") as output_file:
json.dump(conformed_elements, output_file)
return output_path
@dataclass
class ChromaUploaderConfig(UploaderConfig):
batch_size: int = 100
@dataclass
class ChromaUploader(Uploader):
upload_config: ChromaUploaderConfig
connection_config: ChromaConnectionConfig
client: Optional["Client"] = field(init=False)
def __post_init__(self):
self.client = self.create_client()
@requires_dependencies(["chromadb"], extras="chroma")
def create_client(self) -> "Client":
import chromadb
if self.connection_config.path:
return chromadb.PersistentClient(
path=self.connection_config.path,
settings=self.connection_config.access_config.settings,
tenant=self.connection_config.tenant,
database=self.connection_config.database,
)
elif self.connection_config.host and self.connection_config.port:
return chromadb.HttpClient(
host=self.connection_config.host,
port=self.connection_config.port,
ssl=self.connection_config.ssl,
headers=self.connection_config.access_config.headers,
settings=self.connection_config.access_config.settings,
tenant=self.connection_config.tenant,
database=self.connection_config.database,
)
else:
raise ValueError("Chroma connector requires either path or host and port to be set.")
@DestinationConnectionError.wrap
def upsert_batch(self, collection, batch):
try:
# Chroma wants lists even if there is only one element
# Upserting to prevent duplicates
collection.upsert(
ids=batch["ids"],
documents=batch["documents"],
embeddings=batch["embeddings"],
metadatas=batch["metadatas"],
)
except Exception as e:
raise ValueError(f"chroma error: {e}") from e
@staticmethod
def prepare_chroma_list(chunk: t.Tuple[t.Dict[str, t.Any]]) -> t.Dict[str, t.List[t.Any]]:
"""Helper function to break a tuple of dicts into list of parallel lists for ChromaDb.
({'id':1}, {'id':2}, {'id':3}) -> {'ids':[1,2,3]}"""
chroma_dict = {}
chroma_dict["ids"] = [x.get("id") for x in chunk]
chroma_dict["documents"] = [x.get("document") for x in chunk]
chroma_dict["embeddings"] = [x.get("embedding") for x in chunk]
chroma_dict["metadatas"] = [x.get("metadata") for x in chunk]
# Make sure all lists are of the same length
assert (
len(chroma_dict["ids"])
== len(chroma_dict["documents"])
== len(chroma_dict["embeddings"])
== len(chroma_dict["metadatas"])
)
return chroma_dict
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
elements_dict = []
for content in contents:
with open(content.path) as elements_file:
elements = json.load(elements_file)
elements_dict.extend(elements)
logger.info(
f"writing {len(elements_dict)} objects to destination "
f"collection {self.connection_config.collection_name} "
f"at {self.connection_config.host}",
)
logger.info(f"Inserting / updating {len(elements_dict)} documents to destination ")
collection = self.client.get_or_create_collection(
name=self.connection_config.collection_name
)
for chunk in chunk_generator(elements_dict, self.upload_config.batch_size):
self.upsert_batch(collection, self.prepare_chroma_list(chunk))
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
connection_config=ChromaConnectionConfig,
uploader=ChromaUploader,
uploader_config=ChromaUploaderConfig,
upload_stager=ChromaUploadStager,
upload_stager_config=ChromaUploadStagerConfig,
),
)

View File

@ -193,6 +193,7 @@ add_source_entry(
indexer_config=LocalIndexerConfig,
downloader=LocalDownloader,
downloader_config=LocalDownloaderConfig,
connection_config=LocalConnectionConfig,
),
)