diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9cdaf08ab..4864fed11 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -412,6 +412,10 @@ jobs: ASTRA_DB_TOKEN: ${{secrets.ASTRA_DB_TOKEN}} ASTRA_DB_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}} CLARIFAI_API_KEY: ${{secrets.CLARIFAI_API_KEY}} + DATABRICKS_HOST: ${{secrets.DATABRICKS_HOST}} + DATABRICKS_USERNAME: ${{secrets.DATABRICKS_USERNAME}} + DATABRICKS_PASSWORD: ${{secrets.DATABRICKS_PASSWORD}} + DATABRICKS_CATALOG: ${{secrets.DATABRICKS_CATALOG}} TABLE_OCR: "tesseract" OCR_AGENT: "unstructured.partition.utils.ocr_models.tesseract_ocr.OCRAgentTesseract" CI: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index 256798f32..df664faaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.14.10-dev8 +## 0.14.10-dev9 ### Enhancements diff --git a/test_unstructured_ingest/dest/databricks-volumes.sh b/test_unstructured_ingest/dest/databricks-volumes.sh new file mode 100755 index 000000000..c97289c5f --- /dev/null +++ b/test_unstructured_ingest/dest/databricks-volumes.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +set -e + +SRC_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$SRC_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=databricks-volumes +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME +DESTINATION_PATH=$SCRIPT_DIR/databricks-volumes +CI=${CI:-"false"} + +RANDOM_SUFFIX=$((RANDOM % 100000 + 1)) + +DATABRICKS_VOLUME="test-platform" +DATABRICKS_VOLUME_PATH="databricks-volumes-test-output-$RANDOM_SUFFIX" + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh + +function cleanup() { + python "$SCRIPT_DIR"/python/test-databricks-volumes.py cleanup \ + --host "$DATABRICKS_HOST" \ + --username "$DATABRICKS_USERNAME" \ + --password "$DATABRICKS_PASSWORD" \ + --volume "$DATABRICKS_VOLUME" \ + --catalog "$DATABRICKS_CATALOG" \ + --volume-path "$DATABRICKS_VOLUME_PATH" + + cleanup_dir "$DESTINATION_PATH" + cleanup_dir "$OUTPUT_DIR" + cleanup_dir "$WORK_DIR" + if [ "$CI" == "true" ]; then + cleanup_dir "$DOWNLOAD_DIR" + fi +} + +trap cleanup EXIT + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + databricks-volumes \ + --host "$DATABRICKS_HOST" \ + --username "$DATABRICKS_USERNAME" \ + --password "$DATABRICKS_PASSWORD" \ + --volume "$DATABRICKS_VOLUME" \ + --catalog "$DATABRICKS_CATALOG" \ + --volume-path "$DATABRICKS_VOLUME_PATH" + +python "$SCRIPT_DIR"/python/test-databricks-volumes.py test \ + --host "$DATABRICKS_HOST" \ + --username "$DATABRICKS_USERNAME" \ + --password "$DATABRICKS_PASSWORD" \ + --volume "$DATABRICKS_VOLUME" \ + --catalog "$DATABRICKS_CATALOG" \ + --volume-path "$DATABRICKS_VOLUME_PATH" diff --git a/test_unstructured_ingest/python/test-databricks-volumes.py b/test_unstructured_ingest/python/test-databricks-volumes.py new file mode 100644 index 000000000..427799c3f --- /dev/null +++ b/test_unstructured_ingest/python/test-databricks-volumes.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +import json + +import click +from databricks.sdk import WorkspaceClient + + +@click.group() +def cli(): + pass + + +def _get_volume_path(catalog: str, volume: str, volume_path: str): + return f"/Volumes/{catalog}/default/{volume}/{volume_path}" + + +@cli.command() +@click.option("--host", type=str, required=True) +@click.option("--username", type=str, required=True) +@click.option("--password", type=str, required=True) +@click.option("--catalog", type=str, required=True) +@click.option("--volume", type=str, required=True) +@click.option("--volume-path", type=str, required=True) +def test( + host: str, + username: str, + password: str, + catalog: str, + volume: str, + volume_path: str, +): + client = WorkspaceClient(host=host, username=username, password=password) + files = list( + client.files.list_directory_contents(_get_volume_path(catalog, volume, volume_path)) + ) + + assert len(files) == 1 + + resp = client.files.download(files[0].path) + data = json.loads(resp.contents.read()) + + assert len(data) == 5 + assert [v["type"] for v in data] == [ + "UncategorizedText", + "Title", + "NarrativeText", + "UncategorizedText", + "Title", + ] + + print("Databricks test passed!") + + +@cli.command() +@click.option("--host", type=str, required=True) +@click.option("--username", type=str, required=True) +@click.option("--password", type=str, required=True) +@click.option("--catalog", type=str, required=True) +@click.option("--volume", type=str, required=True) +@click.option("--volume-path", type=str, required=True) +def cleanup( + host: str, + username: str, + password: str, + catalog: str, + volume: str, + volume_path: str, +): + client = WorkspaceClient(host=host, username=username, password=password) + + for file in client.files.list_directory_contents( + _get_volume_path(catalog, volume, volume_path) + ): + client.files.delete(file.path) + client.files.delete_directory(_get_volume_path(catalog, volume, volume_path)) + + +if __name__ == "__main__": + cli() diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index 2e46d6090..06ab975db 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -37,6 +37,7 @@ all_tests=( 'vectara.sh' 'singlestore.sh' 'weaviate.sh' + 'databricks-volumes.sh' ) full_python_matrix_tests=( diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 0e1fd5a86..74eb11b3c 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.14.10-dev8" # pragma: no cover +__version__ = "0.14.10-dev9" # pragma: no cover diff --git a/unstructured/ingest/v2/cli/cmds/__init__.py b/unstructured/ingest/v2/cli/cmds/__init__.py index 78cbca207..ea101c660 100644 --- a/unstructured/ingest/v2/cli/cmds/__init__.py +++ b/unstructured/ingest/v2/cli/cmds/__init__.py @@ -4,6 +4,7 @@ import click from .astra import astra_dest_cmd from .chroma import chroma_dest_cmd +from .databricks_volumes import databricks_volumes_dest_cmd from .elasticsearch import elasticsearch_dest_cmd, elasticsearch_src_cmd from .fsspec.azure import azure_dest_cmd, azure_src_cmd from .fsspec.box import box_dest_cmd, box_src_cmd @@ -59,6 +60,7 @@ dest_cmds = [ singlestore_dest_cmd, weaviate_dest_cmd, mongodb_dest_cmd, + databricks_volumes_dest_cmd, ] duplicate_dest_names = [ diff --git a/unstructured/ingest/v2/cli/cmds/databricks_volumes.py b/unstructured/ingest/v2/cli/cmds/databricks_volumes.py new file mode 100644 index 000000000..e8f8e2486 --- /dev/null +++ b/unstructured/ingest/v2/cli/cmds/databricks_volumes.py @@ -0,0 +1,161 @@ +from dataclasses import dataclass + +import click + +from unstructured.ingest.v2.cli.base import DestCmd +from unstructured.ingest.v2.cli.interfaces import CliConfig +from unstructured.ingest.v2.processes.connectors.databricks_volumes import CONNECTOR_TYPE + + +@dataclass +class DatabricksVolumesCliConnectionConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--host"], + type=str, + default=None, + help="The Databricks host URL for either the " + "Databricks workspace endpoint or the " + "Databricks accounts endpoint.", + ), + click.Option( + ["--account-id"], + type=str, + default=None, + help="The Databricks account ID for the Databricks " + "accounts endpoint. Only has effect when Host is " + "either https://accounts.cloud.databricks.com/ (AWS), " + "https://accounts.azuredatabricks.net/ (Azure), " + "or https://accounts.gcp.databricks.com/ (GCP).", + ), + click.Option( + ["--username"], + type=str, + default=None, + help="The Databricks username part of basic authentication. " + "Only possible when Host is *.cloud.databricks.com (AWS).", + ), + click.Option( + ["--password"], + type=str, + default=None, + help="The Databricks password part of basic authentication. " + "Only possible when Host is *.cloud.databricks.com (AWS).", + ), + click.Option(["--client-id"], type=str, default=None), + click.Option(["--client-secret"], type=str, default=None), + click.Option( + ["--token"], + type=str, + default=None, + help="The Databricks personal access token (PAT) (AWS, Azure, and GCP) or " + "Azure Active Directory (Azure AD) token (Azure).", + ), + click.Option( + ["--azure-workspace-resource-id"], + type=str, + default=None, + help="The Azure Resource Manager ID for the Azure Databricks workspace, " + "which is exchanged for a Databricks host URL.", + ), + click.Option( + ["--azure-client-secret"], + type=str, + default=None, + help="The Azure AD service principal’s client secret.", + ), + click.Option( + ["--azure-client-id"], + type=str, + default=None, + help="The Azure AD service principal’s application ID.", + ), + click.Option( + ["--azure-tenant-id"], + type=str, + default=None, + help="The Azure AD service principal’s tenant ID.", + ), + click.Option( + ["--azure-environment"], + type=str, + default=None, + help="The Azure environment type (such as Public, UsGov, China, and Germany) for a " + "specific set of API endpoints. Defaults to PUBLIC.", + ), + click.Option( + ["--auth-type"], + type=str, + default=None, + help="When multiple auth attributes are available in the " + "environment, use the auth type specified by this " + "argument. This argument also holds the currently " + "selected auth.", + ), + click.Option(["--cluster-id"], type=str, default=None), + click.Option(["--google-credentials"], type=str, default=None), + click.Option(["--google-service-account"], type=str, default=None), + ] + return options + + +@dataclass +class DatabricksVolumesCliUploaderConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + options = [ + click.Option( + ["--volume"], type=str, required=True, help="Name of volume in the Unity Catalog" + ), + click.Option( + ["--catalog"], + type=str, + required=True, + help="Name of the catalog in the Databricks Unity Catalog service", + ), + click.Option( + ["--volume-path"], + type=str, + required=False, + default=None, + help="Optional path within the volume to write to", + ), + click.Option( + ["--overwrite"], + type=bool, + is_flag=True, + help="If true, an existing file will be overwritten.", + ), + click.Option( + ["--encoding"], + type=str, + required=True, + default="utf-8", + help="Encoding applied to the data when written to the volume", + ), + click.Option( + ["--schema"], + type=str, + required=True, + default="default", + help="Schema associated with the volume to write to in the Unity Catalog service", + ), + ] + return options + + +@dataclass +class DatabricksVolumesCliUploadStagerConfig(CliConfig): + @staticmethod + def get_cli_options() -> list[click.Option]: + return [] + + +databricks_volumes_dest_cmd = DestCmd( + cmd_name=CONNECTOR_TYPE, + connection_config=DatabricksVolumesCliConnectionConfig, + uploader_config=DatabricksVolumesCliUploaderConfig, + upload_stager_config=DatabricksVolumesCliUploadStagerConfig, +) diff --git a/unstructured/ingest/v2/examples/example_databricks_volumes.py b/unstructured/ingest/v2/examples/example_databricks_volumes.py new file mode 100644 index 000000000..ecc8b6301 --- /dev/null +++ b/unstructured/ingest/v2/examples/example_databricks_volumes.py @@ -0,0 +1,54 @@ +import os +from pathlib import Path + +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.logger import logger +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.chunker import ChunkerConfig +from unstructured.ingest.v2.processes.connectors.databricks_volumes import ( + DatabricksVolumesAccessConfig, + DatabricksVolumesConnectionConfig, + DatabricksVolumesUploaderConfig, +) +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured.ingest.v2.processes.embedder import EmbedderConfig +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +base_path = Path(__file__).parent.parent.parent.parent.parent +docs_path = base_path / "example-docs" +work_dir = base_path / "tmp_ingest" +output_path = work_dir / "output" +download_path = work_dir / "download" + +if __name__ == "__main__": + logger.info(f"Writing all content in: {work_dir.resolve()}") + Pipeline.from_configs( + context=ProcessorConfig(work_dir=str(work_dir.resolve())), + indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"), + downloader_config=LocalDownloaderConfig(download_dir=download_path), + source_connection_config=LocalConnectionConfig(), + partitioner_config=PartitionerConfig(strategy="fast"), + chunker_config=ChunkerConfig( + chunking_strategy="by_title", + chunk_include_orig_elements=False, + chunk_max_characters=1500, + chunk_multipage_sections=True, + ), + embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"), + destination_connection_config=DatabricksVolumesConnectionConfig( + access_config=DatabricksVolumesAccessConfig( + username=os.environ["DATABRICKS_USERNAME"], + password=os.environ["DATABRICKS_PASSWORD"], + ), + host=os.environ["DATABRICKS_HOST"], + ), + uploader_config=DatabricksVolumesUploaderConfig( + catalog=os.environ["DATABRICKS_CATALOG"], + volume=os.environ["DATABRICKS_VOLUME"], + volume_path=os.environ["DATABRICKS_VOLUME_PATH"], + ), + ).run() diff --git a/unstructured/ingest/v2/processes/connectors/__init__.py b/unstructured/ingest/v2/processes/connectors/__init__.py index a661953f1..5c935678d 100644 --- a/unstructured/ingest/v2/processes/connectors/__init__.py +++ b/unstructured/ingest/v2/processes/connectors/__init__.py @@ -10,6 +10,8 @@ from .astra import CONNECTOR_TYPE as ASTRA_CONNECTOR_TYPE from .astra import astra_destination_entry from .chroma import CONNECTOR_TYPE as CHROMA_CONNECTOR_TYPE from .chroma import chroma_destination_entry +from .databricks_volumes import CONNECTOR_TYPE as DATABRICKS_VOLUMES_CONNECTOR_TYPE +from .databricks_volumes import databricks_volumes_destination_entry from .elasticsearch import CONNECTOR_TYPE as ELASTICSEARCH_CONNECTOR_TYPE from .elasticsearch import elasticsearch_destination_entry, elasticsearch_source_entry from .google_drive import CONNECTOR_TYPE as GOOGLE_DRIVE_CONNECTOR_TYPE @@ -45,3 +47,7 @@ add_destination_entry( ) add_destination_entry(destination_type=WEAVIATE_CONNECTOR_TYPE, entry=weaviate_destination_entry) + +add_destination_entry( + destination_type=DATABRICKS_VOLUMES_CONNECTOR_TYPE, entry=databricks_volumes_destination_entry +) diff --git a/unstructured/ingest/v2/processes/connectors/databricks_volumes.py b/unstructured/ingest/v2/processes/connectors/databricks_volumes.py new file mode 100644 index 000000000..e875535c2 --- /dev/null +++ b/unstructured/ingest/v2/processes/connectors/databricks_volumes.py @@ -0,0 +1,96 @@ +import os +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Optional + +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + UploadContent, + Uploader, + UploaderConfig, +) +from unstructured.ingest.v2.processes.connector_registry import DestinationRegistryEntry +from unstructured.utils import requires_dependencies + +if TYPE_CHECKING: + from databricks.sdk import WorkspaceClient + +CONNECTOR_TYPE = "databricks_volumes" + + +@dataclass +class DatabricksVolumesAccessConfig(AccessConfig): + account_id: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + client_id: Optional[str] = None + client_secret: Optional[str] = None + token: Optional[str] = None + profile: Optional[str] = None + azure_workspace_resource_id: Optional[str] = None + azure_client_secret: Optional[str] = None + azure_client_id: Optional[str] = None + azure_tenant_id: Optional[str] = None + azure_environment: Optional[str] = None + auth_type: Optional[str] = None + cluster_id: Optional[str] = None + google_credentials: Optional[str] = None + google_service_account: Optional[str] = None + + +@dataclass +class DatabricksVolumesConnectionConfig(ConnectionConfig): + access_config: DatabricksVolumesAccessConfig = enhanced_field( + default_factory=DatabricksVolumesAccessConfig, sensitive=True + ) + host: Optional[str] = None + + +@dataclass +class DatabricksVolumesUploaderConfig(UploaderConfig): + volume: str + catalog: str + volume_path: Optional[str] = None + overwrite: bool = False + schema: str = "default" + + @property + def path(self) -> str: + path = f"/Volumes/{self.catalog}/{self.schema}/{self.volume}" + if self.volume_path: + path = f"{path}/{self.volume_path}" + return path + + +@dataclass +class DatabricksVolumesUploader(Uploader): + connector_type: str = CONNECTOR_TYPE + upload_config: DatabricksVolumesUploaderConfig + connection_config: DatabricksVolumesConnectionConfig + client: Optional["WorkspaceClient"] = field(init=False, default=None) + + @requires_dependencies(dependencies=["databricks.sdk"], extras="databricks-volumes") + def __post_init__(self) -> "WorkspaceClient": + from databricks.sdk import WorkspaceClient + + self.client = WorkspaceClient( + host=self.connection_config.host, **self.connection_config.access_config.to_dict() + ) + + def run(self, contents: list[UploadContent], **kwargs: Any) -> None: + for content in contents: + with open(content.path, "rb") as elements_file: + output_path = os.path.join(self.upload_config.path, content.path.name) + self.client.files.upload( + file_path=output_path, + contents=elements_file, + overwrite=self.upload_config.overwrite, + ) + + +databricks_volumes_destination_entry = DestinationRegistryEntry( + connection_config=DatabricksVolumesConnectionConfig, + uploader=DatabricksVolumesUploader, + uploader_config=DatabricksVolumesUploaderConfig, +)