refactor: implement databricks volumes v2 dest connector (#3334)

This commit is contained in:
Nathan Van Gheem 2024-07-03 14:01:16 -05:00 committed by GitHub
parent 493bfccddd
commit 6e4d9ccd5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 468 additions and 2 deletions

View File

@ -412,6 +412,10 @@ jobs:
ASTRA_DB_TOKEN: ${{secrets.ASTRA_DB_TOKEN}}
ASTRA_DB_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}}
CLARIFAI_API_KEY: ${{secrets.CLARIFAI_API_KEY}}
DATABRICKS_HOST: ${{secrets.DATABRICKS_HOST}}
DATABRICKS_USERNAME: ${{secrets.DATABRICKS_USERNAME}}
DATABRICKS_PASSWORD: ${{secrets.DATABRICKS_PASSWORD}}
DATABRICKS_CATALOG: ${{secrets.DATABRICKS_CATALOG}}
TABLE_OCR: "tesseract"
OCR_AGENT: "unstructured.partition.utils.ocr_models.tesseract_ocr.OCRAgentTesseract"
CI: "true"

View File

@ -1,4 +1,4 @@
## 0.14.10-dev8
## 0.14.10-dev9
### Enhancements

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
set -e
SRC_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$SRC_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=databricks-volumes
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
DESTINATION_PATH=$SCRIPT_DIR/databricks-volumes
CI=${CI:-"false"}
RANDOM_SUFFIX=$((RANDOM % 100000 + 1))
DATABRICKS_VOLUME="test-platform"
DATABRICKS_VOLUME_PATH="databricks-volumes-test-output-$RANDOM_SUFFIX"
# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup() {
python "$SCRIPT_DIR"/python/test-databricks-volumes.py cleanup \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"
cleanup_dir "$DESTINATION_PATH"
cleanup_dir "$OUTPUT_DIR"
cleanup_dir "$WORK_DIR"
if [ "$CI" == "true" ]; then
cleanup_dir "$DOWNLOAD_DIR"
fi
}
trap cleanup EXIT
PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
databricks-volumes \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"
python "$SCRIPT_DIR"/python/test-databricks-volumes.py test \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python
import json
import click
from databricks.sdk import WorkspaceClient
@click.group()
def cli():
pass
def _get_volume_path(catalog: str, volume: str, volume_path: str):
return f"/Volumes/{catalog}/default/{volume}/{volume_path}"
@cli.command()
@click.option("--host", type=str, required=True)
@click.option("--username", type=str, required=True)
@click.option("--password", type=str, required=True)
@click.option("--catalog", type=str, required=True)
@click.option("--volume", type=str, required=True)
@click.option("--volume-path", type=str, required=True)
def test(
host: str,
username: str,
password: str,
catalog: str,
volume: str,
volume_path: str,
):
client = WorkspaceClient(host=host, username=username, password=password)
files = list(
client.files.list_directory_contents(_get_volume_path(catalog, volume, volume_path))
)
assert len(files) == 1
resp = client.files.download(files[0].path)
data = json.loads(resp.contents.read())
assert len(data) == 5
assert [v["type"] for v in data] == [
"UncategorizedText",
"Title",
"NarrativeText",
"UncategorizedText",
"Title",
]
print("Databricks test passed!")
@cli.command()
@click.option("--host", type=str, required=True)
@click.option("--username", type=str, required=True)
@click.option("--password", type=str, required=True)
@click.option("--catalog", type=str, required=True)
@click.option("--volume", type=str, required=True)
@click.option("--volume-path", type=str, required=True)
def cleanup(
host: str,
username: str,
password: str,
catalog: str,
volume: str,
volume_path: str,
):
client = WorkspaceClient(host=host, username=username, password=password)
for file in client.files.list_directory_contents(
_get_volume_path(catalog, volume, volume_path)
):
client.files.delete(file.path)
client.files.delete_directory(_get_volume_path(catalog, volume, volume_path))
if __name__ == "__main__":
cli()

View File

@ -37,6 +37,7 @@ all_tests=(
'vectara.sh'
'singlestore.sh'
'weaviate.sh'
'databricks-volumes.sh'
)
full_python_matrix_tests=(

View File

@ -1 +1 @@
__version__ = "0.14.10-dev8" # pragma: no cover
__version__ = "0.14.10-dev9" # pragma: no cover

View File

@ -4,6 +4,7 @@ import click
from .astra import astra_dest_cmd
from .chroma import chroma_dest_cmd
from .databricks_volumes import databricks_volumes_dest_cmd
from .elasticsearch import elasticsearch_dest_cmd, elasticsearch_src_cmd
from .fsspec.azure import azure_dest_cmd, azure_src_cmd
from .fsspec.box import box_dest_cmd, box_src_cmd
@ -59,6 +60,7 @@ dest_cmds = [
singlestore_dest_cmd,
weaviate_dest_cmd,
mongodb_dest_cmd,
databricks_volumes_dest_cmd,
]
duplicate_dest_names = [

View File

@ -0,0 +1,161 @@
from dataclasses import dataclass
import click
from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.databricks_volumes import CONNECTOR_TYPE
@dataclass
class DatabricksVolumesCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--host"],
type=str,
default=None,
help="The Databricks host URL for either the "
"Databricks workspace endpoint or the "
"Databricks accounts endpoint.",
),
click.Option(
["--account-id"],
type=str,
default=None,
help="The Databricks account ID for the Databricks "
"accounts endpoint. Only has effect when Host is "
"either https://accounts.cloud.databricks.com/ (AWS), "
"https://accounts.azuredatabricks.net/ (Azure), "
"or https://accounts.gcp.databricks.com/ (GCP).",
),
click.Option(
["--username"],
type=str,
default=None,
help="The Databricks username part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(
["--password"],
type=str,
default=None,
help="The Databricks password part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(["--client-id"], type=str, default=None),
click.Option(["--client-secret"], type=str, default=None),
click.Option(
["--token"],
type=str,
default=None,
help="The Databricks personal access token (PAT) (AWS, Azure, and GCP) or "
"Azure Active Directory (Azure AD) token (Azure).",
),
click.Option(
["--azure-workspace-resource-id"],
type=str,
default=None,
help="The Azure Resource Manager ID for the Azure Databricks workspace, "
"which is exchanged for a Databricks host URL.",
),
click.Option(
["--azure-client-secret"],
type=str,
default=None,
help="The Azure AD service principals client secret.",
),
click.Option(
["--azure-client-id"],
type=str,
default=None,
help="The Azure AD service principals application ID.",
),
click.Option(
["--azure-tenant-id"],
type=str,
default=None,
help="The Azure AD service principals tenant ID.",
),
click.Option(
["--azure-environment"],
type=str,
default=None,
help="The Azure environment type (such as Public, UsGov, China, and Germany) for a "
"specific set of API endpoints. Defaults to PUBLIC.",
),
click.Option(
["--auth-type"],
type=str,
default=None,
help="When multiple auth attributes are available in the "
"environment, use the auth type specified by this "
"argument. This argument also holds the currently "
"selected auth.",
),
click.Option(["--cluster-id"], type=str, default=None),
click.Option(["--google-credentials"], type=str, default=None),
click.Option(["--google-service-account"], type=str, default=None),
]
return options
@dataclass
class DatabricksVolumesCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--volume"], type=str, required=True, help="Name of volume in the Unity Catalog"
),
click.Option(
["--catalog"],
type=str,
required=True,
help="Name of the catalog in the Databricks Unity Catalog service",
),
click.Option(
["--volume-path"],
type=str,
required=False,
default=None,
help="Optional path within the volume to write to",
),
click.Option(
["--overwrite"],
type=bool,
is_flag=True,
help="If true, an existing file will be overwritten.",
),
click.Option(
["--encoding"],
type=str,
required=True,
default="utf-8",
help="Encoding applied to the data when written to the volume",
),
click.Option(
["--schema"],
type=str,
required=True,
default="default",
help="Schema associated with the volume to write to in the Unity Catalog service",
),
]
return options
@dataclass
class DatabricksVolumesCliUploadStagerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
return []
databricks_volumes_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=DatabricksVolumesCliConnectionConfig,
uploader_config=DatabricksVolumesCliUploaderConfig,
upload_stager_config=DatabricksVolumesCliUploadStagerConfig,
)

View File

@ -0,0 +1,54 @@
import os
from pathlib import Path
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.databricks_volumes import (
DatabricksVolumesAccessConfig,
DatabricksVolumesConnectionConfig,
DatabricksVolumesUploaderConfig,
)
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=DatabricksVolumesConnectionConfig(
access_config=DatabricksVolumesAccessConfig(
username=os.environ["DATABRICKS_USERNAME"],
password=os.environ["DATABRICKS_PASSWORD"],
),
host=os.environ["DATABRICKS_HOST"],
),
uploader_config=DatabricksVolumesUploaderConfig(
catalog=os.environ["DATABRICKS_CATALOG"],
volume=os.environ["DATABRICKS_VOLUME"],
volume_path=os.environ["DATABRICKS_VOLUME_PATH"],
),
).run()

View File

@ -10,6 +10,8 @@ from .astra import CONNECTOR_TYPE as ASTRA_CONNECTOR_TYPE
from .astra import astra_destination_entry
from .chroma import CONNECTOR_TYPE as CHROMA_CONNECTOR_TYPE
from .chroma import chroma_destination_entry
from .databricks_volumes import CONNECTOR_TYPE as DATABRICKS_VOLUMES_CONNECTOR_TYPE
from .databricks_volumes import databricks_volumes_destination_entry
from .elasticsearch import CONNECTOR_TYPE as ELASTICSEARCH_CONNECTOR_TYPE
from .elasticsearch import elasticsearch_destination_entry, elasticsearch_source_entry
from .google_drive import CONNECTOR_TYPE as GOOGLE_DRIVE_CONNECTOR_TYPE
@ -45,3 +47,7 @@ add_destination_entry(
)
add_destination_entry(destination_type=WEAVIATE_CONNECTOR_TYPE, entry=weaviate_destination_entry)
add_destination_entry(
destination_type=DATABRICKS_VOLUMES_CONNECTOR_TYPE, entry=databricks_volumes_destination_entry
)

View File

@ -0,0 +1,96 @@
import os
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
UploadContent,
Uploader,
UploaderConfig,
)
from unstructured.ingest.v2.processes.connector_registry import DestinationRegistryEntry
from unstructured.utils import requires_dependencies
if TYPE_CHECKING:
from databricks.sdk import WorkspaceClient
CONNECTOR_TYPE = "databricks_volumes"
@dataclass
class DatabricksVolumesAccessConfig(AccessConfig):
account_id: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
token: Optional[str] = None
profile: Optional[str] = None
azure_workspace_resource_id: Optional[str] = None
azure_client_secret: Optional[str] = None
azure_client_id: Optional[str] = None
azure_tenant_id: Optional[str] = None
azure_environment: Optional[str] = None
auth_type: Optional[str] = None
cluster_id: Optional[str] = None
google_credentials: Optional[str] = None
google_service_account: Optional[str] = None
@dataclass
class DatabricksVolumesConnectionConfig(ConnectionConfig):
access_config: DatabricksVolumesAccessConfig = enhanced_field(
default_factory=DatabricksVolumesAccessConfig, sensitive=True
)
host: Optional[str] = None
@dataclass
class DatabricksVolumesUploaderConfig(UploaderConfig):
volume: str
catalog: str
volume_path: Optional[str] = None
overwrite: bool = False
schema: str = "default"
@property
def path(self) -> str:
path = f"/Volumes/{self.catalog}/{self.schema}/{self.volume}"
if self.volume_path:
path = f"{path}/{self.volume_path}"
return path
@dataclass
class DatabricksVolumesUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: DatabricksVolumesUploaderConfig
connection_config: DatabricksVolumesConnectionConfig
client: Optional["WorkspaceClient"] = field(init=False, default=None)
@requires_dependencies(dependencies=["databricks.sdk"], extras="databricks-volumes")
def __post_init__(self) -> "WorkspaceClient":
from databricks.sdk import WorkspaceClient
self.client = WorkspaceClient(
host=self.connection_config.host, **self.connection_config.access_config.to_dict()
)
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
for content in contents:
with open(content.path, "rb") as elements_file:
output_path = os.path.join(self.upload_config.path, content.path.name)
self.client.files.upload(
file_path=output_path,
contents=elements_file,
overwrite=self.upload_config.overwrite,
)
databricks_volumes_destination_entry = DestinationRegistryEntry(
connection_config=DatabricksVolumesConnectionConfig,
uploader=DatabricksVolumesUploader,
uploader_config=DatabricksVolumesUploaderConfig,
)