feat: databricks volumes dest added (#2391)

### Description
This adds in a destination connector to write content to the Databricks
Unity Catalog Volumes service. Currently there is an internal account
that can be used for testing manually but there is not dedicated account
to use for testing so this is not being added to the automated ingest
tests that get run in the CI.

To test locally: 
```shell
#!/usr/bin/env bash

path="testpath/$(uuidgen)"

PYTHONPATH=. python ./unstructured/ingest/main.py local \
    --num-processes 4 \
    --output-dir azure-test \
    --strategy fast \
    --verbose \
    --input-path example-docs/fake-memo.pdf \
    --recursive \
    databricks-volumes \
    --catalog "utic-dev-tech-fixtures" \
    --volume "small-pdf-set" \
    --volume-path "$path" \
    --username "$DATABRICKS_USERNAME" \
    --password "$DATABRICKS_PASSWORD" \
    --host "$DATABRICKS_HOST"
```
This commit is contained in:
Roman Isecke 2024-01-22 20:25:51 -05:00 committed by GitHub
parent 283f796f58
commit a8de52e94f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 490 additions and 5 deletions

View File

@ -1,9 +1,11 @@
## 0.12.2-dev0
## 0.12.3-dev1
### Enhancements
### Features
* **Add Databricks Volumes destination connector** Databricks Volumes connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data to a Databricks Volumes storage service.
### Fixes
* **Fix FSSpec destination connectors check_connection.** FSSpec destination connectors did not use `check_connection`. There was an error when trying to `ls` destination directory - it may not exist at the moment of connector creation. Now `check_connection` calls `ls` on bucket root and this method is called on `initialize` of destination connector.

View File

@ -237,11 +237,15 @@ install-ingest-chroma:
.PHONY: install-ingest-postgres
install-ingest-postgres:
python3 -m pip install -r requirements/ingest-postgres.txt
python3 -m pip install -r requirements/ingest/postgres.txt
.PHONY: install-ingest-mongodb
install-ingest-mongodb:
python3 -m pip install -r requirements/ingest-mongodb.txt
python3 -m pip install -r requirements/ingest/mongodb.txt
.PHONY: install-ingest-databricks-volumes
install-ingest-databricks-volumes:
python3 -m pip install -r requirements/ingest/databricks-volumes.txt
.PHONY: install-embed-huggingface
install-embed-huggingface:

View File

@ -12,6 +12,7 @@ in our community `Slack. <https://short.unstructured.io/pzw05l7>`_
destination_connectors/azure_cognitive_search
destination_connectors/box
destination_connectors/chroma
destination_connectors/databricks_volumes
destination_connectors/delta_table
destination_connectors/dropbox
destination_connectors/elasticsearch

View File

@ -12,7 +12,7 @@ First you'll need to install the Chroma dependencies as shown here.
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
upstream local connector.
.. tabs::

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
unstructured-ingest \
local \
--input-path example-docs/book-war-and-peace-1p.txt \
--output-dir local-to-databricks-volume \
--strategy fast \
--chunk-elements \
--embedding-provider "<unstructured embedding provider, ie. langchain-huggingface>" \
--num-processes 2 \
--verbose \
--work-dir "<directory for intermediate outputs to be saved>" \
databricks-volumes \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG"

View File

@ -0,0 +1,58 @@
import os
from unstructured.ingest.connector.databricks_volumes import (
DatabricksVolumesAccessConfig,
DatabricksVolumesWriteConfig,
SimpleDatabricksVolumesConfig,
)
from unstructured.ingest.connector.local import SimpleLocalConfig
from unstructured.ingest.interfaces import (
ChunkingConfig,
EmbeddingConfig,
PartitionConfig,
ProcessorConfig,
ReadConfig,
)
from unstructured.ingest.runner import LocalRunner
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.databricks_volumes import (
DatabricksVolumesWriter,
)
def get_writer() -> Writer:
return DatabricksVolumesWriter(
connector_config=SimpleDatabricksVolumesConfig(
host=os.getenv("DATABRICKS_HOST"),
access_config=DatabricksVolumesAccessConfig(
username=os.getenv("DATABRICKS_USERNAME"), password=os.getenv("DATABRICKS_PASSWORD")
),
),
write_config=DatabricksVolumesWriteConfig(
catalog=os.getenv("DATABRICKS_CATALOG"),
volume=os.getenv("DATABRICKS_VOLUME"),
),
)
if __name__ == "__main__":
writer = get_writer()
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-output-to-databricks-volumes",
num_processes=2,
),
connector_config=SimpleLocalConfig(
input_path="example-docs/book-war-and-peace-1225p.txt",
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
chunking_config=ChunkingConfig(chunk_elements=True),
embedding_config=EmbeddingConfig(
provider="langchain-huggingface",
),
writer=writer,
writer_kwargs={},
)
runner.run()

View File

@ -0,0 +1,32 @@
Databricks Volumes
===========
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Databricks Volume.
First you'll need to install the Databricks Volume dependencies as shown here.
.. code:: shell
pip install "unstructured[databricks-volumes]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream local connector.
.. tabs::
.. tab:: Shell
.. literalinclude:: ./code/bash/databricks_volumes.sh
:language: bash
.. tab:: Python
.. literalinclude:: ./code/python/databricks_volumes.py
:language: python
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> databricks-volumes --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,3 @@
-c ../constraints.in
-c ../base.txt
databricks-sdk

View File

@ -0,0 +1,42 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=ingest/databricks-volumes.txt ingest/databricks-volumes.in
#
cachetools==5.3.2
# via google-auth
certifi==2023.11.17
# via
# -c ingest/../base.txt
# -c ingest/../constraints.in
# requests
charset-normalizer==3.3.2
# via
# -c ingest/../base.txt
# requests
databricks-sdk==0.16.0
# via -r ingest/databricks-volumes.in
google-auth==2.26.1
# via databricks-sdk
idna==3.6
# via
# -c ingest/../base.txt
# requests
pyasn1==0.5.1
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.3.0
# via google-auth
requests==2.31.0
# via
# -c ingest/../base.txt
# databricks-sdk
rsa==4.9
# via google-auth
urllib3==1.26.18
# via
# -c ingest/../base.txt
# -c ingest/../constraints.in
# requests

View File

@ -168,6 +168,7 @@ setup(
"embed-huggingface": load_requirements("requirements/ingest/embed-huggingface.in"),
"openai": load_requirements("requirements/ingest/embed-openai.in"),
"bedrock": load_requirements("requirements/ingest/embed-aws-bedrock.in"),
"databricks-volumes": load_requirements("requirements/ingest/databricks.in"),
},
package_dir={"unstructured": "unstructured"},
package_data={"unstructured": ["nlp/*.txt"]},

View File

@ -1 +1 @@
__version__ = "0.12.2-dev0" # pragma: no cover
__version__ = "0.12.3-dev1" # pragma: no cover

View File

@ -11,6 +11,7 @@ from .azure_cognitive_search import get_base_dest_cmd as azure_cognitive_search_
from .biomed import get_base_src_cmd as biomed_base_src_cmd
from .chroma import get_base_dest_cmd as chroma_base_dest_cmd
from .confluence import get_base_src_cmd as confluence_base_src_cmd
from .databricks_volumes import get_base_dest_cmd as databricks_volumes_dest_cmd
from .delta_table import get_base_dest_cmd as delta_table_dest_cmd
from .delta_table import get_base_src_cmd as delta_table_base_src_cmd
from .discord import get_base_src_cmd as discord_base_src_cmd
@ -100,6 +101,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [
azure_base_dest_cmd,
box_base_dest_cmd,
chroma_base_dest_cmd,
databricks_volumes_dest_cmd,
dropbox_base_dest_cmd,
elasticsearch_base_dest_cmd,
fsspec_base_dest_cmd,

View File

@ -0,0 +1,163 @@
import typing as t
from dataclasses import dataclass
import click
from unstructured.ingest.cli.interfaces import CliConfig
from unstructured.ingest.connector.databricks_volumes import (
DatabricksVolumesWriteConfig,
SimpleDatabricksVolumesConfig,
)
CMD_NAME = "databricks-volumes"
@dataclass
class DatabricksVolumesCliConfig(SimpleDatabricksVolumesConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--host"],
type=str,
default=None,
help="The Databricks host URL for either the "
"Databricks workspace endpoint or the "
"Databricks accounts endpoint.",
),
click.Option(
["--account-id"],
type=str,
default=None,
help="The Databricks account ID for the Databricks "
"accounts endpoint. Only has effect when Host is "
"either https://accounts.cloud.databricks.com/ (AWS), "
"https://accounts.azuredatabricks.net/ (Azure), "
"or https://accounts.gcp.databricks.com/ (GCP).",
),
click.Option(
["--username"],
type=str,
default=None,
help="The Databricks username part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(
["--password"],
type=str,
default=None,
help="The Databricks password part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(["--client-id"], type=str, default=None),
click.Option(["--client-secret"], type=str, default=None),
click.Option(
["--token"],
type=str,
default=None,
help="The Databricks personal access token (PAT) (AWS, Azure, and GCP) or "
"Azure Active Directory (Azure AD) token (Azure).",
),
click.Option(
["--azure-workspace-resource-id"],
type=str,
default=None,
help="The Azure Resource Manager ID for the Azure Databricks workspace, "
"which is exchanged for a Databricks host URL.",
),
click.Option(
["--azure-client-secret"],
type=str,
default=None,
help="The Azure AD service principals client secret.",
),
click.Option(
["--azure-client-id"],
type=str,
default=None,
help="The Azure AD service principals application ID.",
),
click.Option(
["--azure-tenant-id"],
type=str,
default=None,
help="The Azure AD service principals tenant ID.",
),
click.Option(
["--azure-environment"],
type=str,
default=None,
help="The Azure environment type (such as Public, UsGov, China, and Germany) for a "
"specific set of API endpoints. Defaults to PUBLIC.",
),
click.Option(
["--auth-type"],
type=str,
default=None,
help="When multiple auth attributes are available in the "
"environment, use the auth type specified by this "
"argument. This argument also holds the currently "
"selected auth.",
),
click.Option(["--cluster-id"], type=str, default=None),
click.Option(["--google-credentials"], type=str, default=None),
click.Option(["--google-service-account"], type=str, default=None),
]
return options
@dataclass
class DatabricksVolumesCliWriteConfig(DatabricksVolumesWriteConfig, CliConfig):
@staticmethod
def get_cli_options() -> t.List[click.Option]:
options = [
click.Option(
["--volume"], type=str, required=True, help="Name of volume in the Unity Catalog"
),
click.Option(
["--catalog"],
type=str,
required=True,
help="Name of the catalog in the Databricks Unity Catalog service",
),
click.Option(
["--volume-path"],
type=str,
required=False,
default=None,
help="Optional path within the volume to write to",
),
click.Option(
["--overwrite"],
type=bool,
is_flag=True,
help="If true, an existing file will be overwritten.",
),
click.Option(
["--encoding"],
type=str,
required=True,
default="utf-8",
help="Encoding applied to the data when written to the volume",
),
click.Option(
["--schema"],
type=str,
required=True,
default="default",
help="Schema associated with the volume to write to in the Unity Catalog service",
),
]
return options
def get_base_dest_cmd():
from unstructured.ingest.cli.base.dest import BaseDestCmd
cmd_cls = BaseDestCmd(
cmd_name=CMD_NAME,
cli_config=DatabricksVolumesCliConfig,
additional_cli_options=[DatabricksVolumesCliWriteConfig],
write_config=DatabricksVolumesWriteConfig,
)
return cmd_cls

View File

@ -0,0 +1,132 @@
import copy
import json
import os
import typing as t
from dataclasses import dataclass, field
from io import BytesIO
from pathlib import PurePath
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.enhanced_dataclass.core import _asdict
from unstructured.ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
BaseSingleIngestDoc,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
if t.TYPE_CHECKING:
from databricks.sdk import WorkspaceClient
@dataclass
class DatabricksVolumesAccessConfig(AccessConfig):
account_id: t.Optional[str] = None
username: t.Optional[str] = None
password: t.Optional[str] = enhanced_field(default=None, sensitive=True)
client_id: t.Optional[str] = None
client_secret: t.Optional[str] = enhanced_field(default=None, sensitive=True)
token: t.Optional[str] = enhanced_field(default=None, sensitive=True)
profile: t.Optional[str] = None
azure_workspace_resource_id: t.Optional[str] = None
azure_client_secret: t.Optional[str] = enhanced_field(default=None, sensitive=True)
azure_client_id: t.Optional[str] = None
azure_tenant_id: t.Optional[str] = None
azure_environment: t.Optional[str] = None
auth_type: t.Optional[str] = None
cluster_id: t.Optional[str] = None
google_credentials: t.Optional[str] = None
google_service_account: t.Optional[str] = None
@dataclass
class SimpleDatabricksVolumesConfig(BaseConnectorConfig):
access_config: DatabricksVolumesAccessConfig
host: t.Optional[str] = None
@dataclass
class DatabricksVolumesWriteConfig(WriteConfig):
volume: str
catalog: str
volume_path: t.Optional[str] = None
overwrite: bool = False
encoding: str = "utf-8"
schema: str = "default"
@property
def path(self) -> str:
path = f"/Volumes/{self.catalog}/{self.schema}/{self.volume}"
if self.volume_path:
path = f"{path}/{self.volume_path}"
return path
@dataclass
class DatabricksVolumesDestinationConnector(BaseDestinationConnector):
write_config: DatabricksVolumesWriteConfig
connector_config: SimpleDatabricksVolumesConfig
_client: t.Optional["WorkspaceClient"] = field(init=False, default=None)
def to_dict(self, **kwargs):
self_cp = copy.copy(self)
if hasattr(self_cp, "_client"):
setattr(self_cp, "_client", None)
return _asdict(self_cp, **kwargs)
@requires_dependencies(dependencies=["databricks.sdk"], extras="databricks")
def generate_client(self) -> "WorkspaceClient":
from databricks.sdk import WorkspaceClient
return WorkspaceClient(
host=self.connector_config.host, **self.connector_config.access_config.to_dict()
)
@property
def client(self) -> "WorkspaceClient":
if self._client is None:
self._client = self.generate_client()
return self._client
def check_connection(self):
pass
def initialize(self):
_ = self.client
def write_dict(
self,
*args,
elements_dict: t.List[t.Dict[str, t.Any]],
filename: t.Optional[str] = None,
indent: int = 4,
encoding: str = "utf-8",
**kwargs,
) -> None:
output_folder = self.write_config.path
output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator
filename = (
filename.strip(os.sep) if filename else filename
) # Make sure filename doesn't begin with file seperator
output_path = str(PurePath(output_folder, filename)) if filename else output_folder
logger.debug(f"uploading content to {output_path}")
self.client.files.upload(
file_path=output_path,
contents=BytesIO(json.dumps(elements_dict).encode(encoding=self.write_config.encoding)),
overwrite=self.write_config.overwrite,
)
def get_elements_dict(self, docs: t.List[BaseSingleIngestDoc]) -> t.List[t.Dict[str, t.Any]]:
pass
def write(self, docs: t.List[BaseSingleIngestDoc]) -> None:
for doc in docs:
file_path = doc.base_output_filename
filename = file_path if file_path else None
with open(doc._output_filename) as json_file:
logger.debug(f"uploading content from {doc._output_filename}")
json_list = json.load(json_file)
self.write_dict(elements_dict=json_list, filename=filename)

View File

@ -3,6 +3,7 @@ import typing as t
from .azure_cognitive_search import AzureCognitiveSearchWriter
from .base_writer import Writer
from .chroma import ChromaWriter
from .databricks_volumes import DatabricksVolumesWriter
from .delta_table import DeltaTableWriter
from .elasticsearch import ElasticsearchWriter
from .fsspec.azure import AzureWriter
@ -22,6 +23,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = {
"azure_cognitive_search": AzureCognitiveSearchWriter,
"box": BoxWriter,
"chroma": ChromaWriter,
"databricks_volumes": DatabricksVolumesWriter,
"delta_table": DeltaTableWriter,
"dropbox": DropboxWriter,
"elasticsearch": ElasticsearchWriter,

View File

@ -0,0 +1,25 @@
import typing as t
from dataclasses import dataclass
from unstructured.ingest.enhanced_dataclass import EnhancedDataClassJsonMixin
from unstructured.ingest.interfaces import BaseDestinationConnector
from unstructured.ingest.runner.writers.base_writer import Writer
if t.TYPE_CHECKING:
from unstructured.ingest.connector.databricks_volumes import (
DatabricksVolumesWriteConfig,
SimpleDatabricksVolumesConfig,
)
@dataclass
class DatabricksVolumesWriter(Writer, EnhancedDataClassJsonMixin):
write_config: "DatabricksVolumesWriteConfig"
connector_config: "SimpleDatabricksVolumesConfig"
def get_connector_cls(self) -> t.Type[BaseDestinationConnector]:
from unstructured.ingest.connector.databricks_volumes import (
DatabricksVolumesDestinationConnector,
)
return DatabricksVolumesDestinationConnector