From a8de52e94ffbc039da5f9a463dd5d5a7cde20bc8 Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Mon, 22 Jan 2024 20:25:51 -0500 Subject: [PATCH] feat: databricks volumes dest added (#2391) ### Description This adds in a destination connector to write content to the Databricks Unity Catalog Volumes service. Currently there is an internal account that can be used for testing manually but there is not dedicated account to use for testing so this is not being added to the automated ingest tests that get run in the CI. To test locally: ```shell #!/usr/bin/env bash path="testpath/$(uuidgen)" PYTHONPATH=. python ./unstructured/ingest/main.py local \ --num-processes 4 \ --output-dir azure-test \ --strategy fast \ --verbose \ --input-path example-docs/fake-memo.pdf \ --recursive \ databricks-volumes \ --catalog "utic-dev-tech-fixtures" \ --volume "small-pdf-set" \ --volume-path "$path" \ --username "$DATABRICKS_USERNAME" \ --password "$DATABRICKS_PASSWORD" \ --host "$DATABRICKS_HOST" ``` --- CHANGELOG.md | 4 +- Makefile | 8 +- docs/source/ingest/destination_connectors.rst | 1 + .../ingest/destination_connectors/chroma.rst | 2 +- .../code/bash/databricks_volumes.sh | 18 ++ .../code/python/databricks_volumes.py | 58 +++++++ .../databricks_volumes.rst | 32 ++++ requirements/ingest/databricks-volumes.in | 3 + requirements/ingest/databricks-volumes.txt | 42 +++++ setup.py | 1 + unstructured/__version__.py | 2 +- unstructured/ingest/cli/cmds/__init__.py | 2 + .../ingest/cli/cmds/databricks_volumes.py | 163 ++++++++++++++++++ .../ingest/connector/databricks_volumes.py | 132 ++++++++++++++ .../ingest/runner/writers/__init__.py | 2 + .../runner/writers/databricks_volumes.py | 25 +++ 16 files changed, 490 insertions(+), 5 deletions(-) create mode 100644 docs/source/ingest/destination_connectors/code/bash/databricks_volumes.sh create mode 100644 docs/source/ingest/destination_connectors/code/python/databricks_volumes.py create mode 100644 docs/source/ingest/destination_connectors/databricks_volumes.rst create mode 100644 requirements/ingest/databricks-volumes.in create mode 100644 requirements/ingest/databricks-volumes.txt create mode 100644 unstructured/ingest/cli/cmds/databricks_volumes.py create mode 100644 unstructured/ingest/connector/databricks_volumes.py create mode 100644 unstructured/ingest/runner/writers/databricks_volumes.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d66b3ac9..81d5e2195 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,11 @@ -## 0.12.2-dev0 +## 0.12.3-dev1 ### Enhancements ### Features +* **Add Databricks Volumes destination connector** Databricks Volumes connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data to a Databricks Volumes storage service. + ### Fixes * **Fix FSSpec destination connectors check_connection.** FSSpec destination connectors did not use `check_connection`. There was an error when trying to `ls` destination directory - it may not exist at the moment of connector creation. Now `check_connection` calls `ls` on bucket root and this method is called on `initialize` of destination connector. diff --git a/Makefile b/Makefile index f0870b2f1..c7c81de56 100644 --- a/Makefile +++ b/Makefile @@ -237,11 +237,15 @@ install-ingest-chroma: .PHONY: install-ingest-postgres install-ingest-postgres: - python3 -m pip install -r requirements/ingest-postgres.txt + python3 -m pip install -r requirements/ingest/postgres.txt .PHONY: install-ingest-mongodb install-ingest-mongodb: - python3 -m pip install -r requirements/ingest-mongodb.txt + python3 -m pip install -r requirements/ingest/mongodb.txt + +.PHONY: install-ingest-databricks-volumes +install-ingest-databricks-volumes: + python3 -m pip install -r requirements/ingest/databricks-volumes.txt .PHONY: install-embed-huggingface install-embed-huggingface: diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index bc60e3fd5..6d4ba6ac3 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -12,6 +12,7 @@ in our community `Slack. `_ destination_connectors/azure_cognitive_search destination_connectors/box destination_connectors/chroma + destination_connectors/databricks_volumes destination_connectors/delta_table destination_connectors/dropbox destination_connectors/elasticsearch diff --git a/docs/source/ingest/destination_connectors/chroma.rst b/docs/source/ingest/destination_connectors/chroma.rst index 76cefd6f0..768eb542d 100644 --- a/docs/source/ingest/destination_connectors/chroma.rst +++ b/docs/source/ingest/destination_connectors/chroma.rst @@ -12,7 +12,7 @@ First you'll need to install the Chroma dependencies as shown here. Run Locally ----------- The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the -upstream local connector. +upstream local connector. .. tabs:: diff --git a/docs/source/ingest/destination_connectors/code/bash/databricks_volumes.sh b/docs/source/ingest/destination_connectors/code/bash/databricks_volumes.sh new file mode 100644 index 000000000..090bf55fb --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/bash/databricks_volumes.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +unstructured-ingest \ + local \ + --input-path example-docs/book-war-and-peace-1p.txt \ + --output-dir local-to-databricks-volume \ + --strategy fast \ + --chunk-elements \ + --embedding-provider "" \ + --num-processes 2 \ + --verbose \ + --work-dir "" \ + databricks-volumes \ + --host "$DATABRICKS_HOST" \ + --username "$DATABRICKS_USERNAME" \ + --password "$DATABRICKS_PASSWORD" \ + --volume "$DATABRICKS_VOLUME" \ + --catalog "$DATABRICKS_CATALOG" diff --git a/docs/source/ingest/destination_connectors/code/python/databricks_volumes.py b/docs/source/ingest/destination_connectors/code/python/databricks_volumes.py new file mode 100644 index 000000000..43053bdfd --- /dev/null +++ b/docs/source/ingest/destination_connectors/code/python/databricks_volumes.py @@ -0,0 +1,58 @@ +import os + +from unstructured.ingest.connector.databricks_volumes import ( + DatabricksVolumesAccessConfig, + DatabricksVolumesWriteConfig, + SimpleDatabricksVolumesConfig, +) +from unstructured.ingest.connector.local import SimpleLocalConfig +from unstructured.ingest.interfaces import ( + ChunkingConfig, + EmbeddingConfig, + PartitionConfig, + ProcessorConfig, + ReadConfig, +) +from unstructured.ingest.runner import LocalRunner +from unstructured.ingest.runner.writers.base_writer import Writer +from unstructured.ingest.runner.writers.databricks_volumes import ( + DatabricksVolumesWriter, +) + + +def get_writer() -> Writer: + return DatabricksVolumesWriter( + connector_config=SimpleDatabricksVolumesConfig( + host=os.getenv("DATABRICKS_HOST"), + access_config=DatabricksVolumesAccessConfig( + username=os.getenv("DATABRICKS_USERNAME"), password=os.getenv("DATABRICKS_PASSWORD") + ), + ), + write_config=DatabricksVolumesWriteConfig( + catalog=os.getenv("DATABRICKS_CATALOG"), + volume=os.getenv("DATABRICKS_VOLUME"), + ), + ) + + +if __name__ == "__main__": + writer = get_writer() + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-databricks-volumes", + num_processes=2, + ), + connector_config=SimpleLocalConfig( + input_path="example-docs/book-war-and-peace-1225p.txt", + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + chunking_config=ChunkingConfig(chunk_elements=True), + embedding_config=EmbeddingConfig( + provider="langchain-huggingface", + ), + writer=writer, + writer_kwargs={}, + ) + runner.run() diff --git a/docs/source/ingest/destination_connectors/databricks_volumes.rst b/docs/source/ingest/destination_connectors/databricks_volumes.rst new file mode 100644 index 000000000..4d194491c --- /dev/null +++ b/docs/source/ingest/destination_connectors/databricks_volumes.rst @@ -0,0 +1,32 @@ +Databricks Volumes +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Databricks Volume. + +First you'll need to install the Databricks Volume dependencies as shown here. + +.. code:: shell + + pip install "unstructured[databricks-volumes]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream local connector. + +.. tabs:: + + .. tab:: Shell + + .. literalinclude:: ./code/bash/databricks_volumes.sh + :language: bash + + .. tab:: Python + + .. literalinclude:: ./code/python/databricks_volumes.py + :language: python + + +For a full list of the options the CLI accepts check ``unstructured-ingest databricks-volumes --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/requirements/ingest/databricks-volumes.in b/requirements/ingest/databricks-volumes.in new file mode 100644 index 000000000..9cc0545e0 --- /dev/null +++ b/requirements/ingest/databricks-volumes.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +databricks-sdk \ No newline at end of file diff --git a/requirements/ingest/databricks-volumes.txt b/requirements/ingest/databricks-volumes.txt new file mode 100644 index 000000000..558e26415 --- /dev/null +++ b/requirements/ingest/databricks-volumes.txt @@ -0,0 +1,42 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=ingest/databricks-volumes.txt ingest/databricks-volumes.in +# +cachetools==5.3.2 + # via google-auth +certifi==2023.11.17 + # via + # -c ingest/../base.txt + # -c ingest/../constraints.in + # requests +charset-normalizer==3.3.2 + # via + # -c ingest/../base.txt + # requests +databricks-sdk==0.16.0 + # via -r ingest/databricks-volumes.in +google-auth==2.26.1 + # via databricks-sdk +idna==3.6 + # via + # -c ingest/../base.txt + # requests +pyasn1==0.5.1 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.3.0 + # via google-auth +requests==2.31.0 + # via + # -c ingest/../base.txt + # databricks-sdk +rsa==4.9 + # via google-auth +urllib3==1.26.18 + # via + # -c ingest/../base.txt + # -c ingest/../constraints.in + # requests diff --git a/setup.py b/setup.py index 7e2ef2b51..56bd12d6d 100644 --- a/setup.py +++ b/setup.py @@ -168,6 +168,7 @@ setup( "embed-huggingface": load_requirements("requirements/ingest/embed-huggingface.in"), "openai": load_requirements("requirements/ingest/embed-openai.in"), "bedrock": load_requirements("requirements/ingest/embed-aws-bedrock.in"), + "databricks-volumes": load_requirements("requirements/ingest/databricks.in"), }, package_dir={"unstructured": "unstructured"}, package_data={"unstructured": ["nlp/*.txt"]}, diff --git a/unstructured/__version__.py b/unstructured/__version__.py index f1055c5ab..933e15f87 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.12.2-dev0" # pragma: no cover +__version__ = "0.12.3-dev1" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 155704ffc..3152c7e97 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -11,6 +11,7 @@ from .azure_cognitive_search import get_base_dest_cmd as azure_cognitive_search_ from .biomed import get_base_src_cmd as biomed_base_src_cmd from .chroma import get_base_dest_cmd as chroma_base_dest_cmd from .confluence import get_base_src_cmd as confluence_base_src_cmd +from .databricks_volumes import get_base_dest_cmd as databricks_volumes_dest_cmd from .delta_table import get_base_dest_cmd as delta_table_dest_cmd from .delta_table import get_base_src_cmd as delta_table_base_src_cmd from .discord import get_base_src_cmd as discord_base_src_cmd @@ -100,6 +101,7 @@ base_dest_cmd_fns: t.List[t.Callable[[], "BaseDestCmd"]] = [ azure_base_dest_cmd, box_base_dest_cmd, chroma_base_dest_cmd, + databricks_volumes_dest_cmd, dropbox_base_dest_cmd, elasticsearch_base_dest_cmd, fsspec_base_dest_cmd, diff --git a/unstructured/ingest/cli/cmds/databricks_volumes.py b/unstructured/ingest/cli/cmds/databricks_volumes.py new file mode 100644 index 000000000..faea5e0d4 --- /dev/null +++ b/unstructured/ingest/cli/cmds/databricks_volumes.py @@ -0,0 +1,163 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import CliConfig +from unstructured.ingest.connector.databricks_volumes import ( + DatabricksVolumesWriteConfig, + SimpleDatabricksVolumesConfig, +) + +CMD_NAME = "databricks-volumes" + + +@dataclass +class DatabricksVolumesCliConfig(SimpleDatabricksVolumesConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--host"], + type=str, + default=None, + help="The Databricks host URL for either the " + "Databricks workspace endpoint or the " + "Databricks accounts endpoint.", + ), + click.Option( + ["--account-id"], + type=str, + default=None, + help="The Databricks account ID for the Databricks " + "accounts endpoint. Only has effect when Host is " + "either https://accounts.cloud.databricks.com/ (AWS), " + "https://accounts.azuredatabricks.net/ (Azure), " + "or https://accounts.gcp.databricks.com/ (GCP).", + ), + click.Option( + ["--username"], + type=str, + default=None, + help="The Databricks username part of basic authentication. " + "Only possible when Host is *.cloud.databricks.com (AWS).", + ), + click.Option( + ["--password"], + type=str, + default=None, + help="The Databricks password part of basic authentication. " + "Only possible when Host is *.cloud.databricks.com (AWS).", + ), + click.Option(["--client-id"], type=str, default=None), + click.Option(["--client-secret"], type=str, default=None), + click.Option( + ["--token"], + type=str, + default=None, + help="The Databricks personal access token (PAT) (AWS, Azure, and GCP) or " + "Azure Active Directory (Azure AD) token (Azure).", + ), + click.Option( + ["--azure-workspace-resource-id"], + type=str, + default=None, + help="The Azure Resource Manager ID for the Azure Databricks workspace, " + "which is exchanged for a Databricks host URL.", + ), + click.Option( + ["--azure-client-secret"], + type=str, + default=None, + help="The Azure AD service principal’s client secret.", + ), + click.Option( + ["--azure-client-id"], + type=str, + default=None, + help="The Azure AD service principal’s application ID.", + ), + click.Option( + ["--azure-tenant-id"], + type=str, + default=None, + help="The Azure AD service principal’s tenant ID.", + ), + click.Option( + ["--azure-environment"], + type=str, + default=None, + help="The Azure environment type (such as Public, UsGov, China, and Germany) for a " + "specific set of API endpoints. Defaults to PUBLIC.", + ), + click.Option( + ["--auth-type"], + type=str, + default=None, + help="When multiple auth attributes are available in the " + "environment, use the auth type specified by this " + "argument. This argument also holds the currently " + "selected auth.", + ), + click.Option(["--cluster-id"], type=str, default=None), + click.Option(["--google-credentials"], type=str, default=None), + click.Option(["--google-service-account"], type=str, default=None), + ] + return options + + +@dataclass +class DatabricksVolumesCliWriteConfig(DatabricksVolumesWriteConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--volume"], type=str, required=True, help="Name of volume in the Unity Catalog" + ), + click.Option( + ["--catalog"], + type=str, + required=True, + help="Name of the catalog in the Databricks Unity Catalog service", + ), + click.Option( + ["--volume-path"], + type=str, + required=False, + default=None, + help="Optional path within the volume to write to", + ), + click.Option( + ["--overwrite"], + type=bool, + is_flag=True, + help="If true, an existing file will be overwritten.", + ), + click.Option( + ["--encoding"], + type=str, + required=True, + default="utf-8", + help="Encoding applied to the data when written to the volume", + ), + click.Option( + ["--schema"], + type=str, + required=True, + default="default", + help="Schema associated with the volume to write to in the Unity Catalog service", + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name=CMD_NAME, + cli_config=DatabricksVolumesCliConfig, + additional_cli_options=[DatabricksVolumesCliWriteConfig], + write_config=DatabricksVolumesWriteConfig, + ) + return cmd_cls diff --git a/unstructured/ingest/connector/databricks_volumes.py b/unstructured/ingest/connector/databricks_volumes.py new file mode 100644 index 000000000..2ffe1c105 --- /dev/null +++ b/unstructured/ingest/connector/databricks_volumes.py @@ -0,0 +1,132 @@ +import copy +import json +import os +import typing as t +from dataclasses import dataclass, field +from io import BytesIO +from pathlib import PurePath + +from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.enhanced_dataclass.core import _asdict +from unstructured.ingest.interfaces import ( + AccessConfig, + BaseConnectorConfig, + BaseDestinationConnector, + BaseSingleIngestDoc, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +if t.TYPE_CHECKING: + from databricks.sdk import WorkspaceClient + + +@dataclass +class DatabricksVolumesAccessConfig(AccessConfig): + account_id: t.Optional[str] = None + username: t.Optional[str] = None + password: t.Optional[str] = enhanced_field(default=None, sensitive=True) + client_id: t.Optional[str] = None + client_secret: t.Optional[str] = enhanced_field(default=None, sensitive=True) + token: t.Optional[str] = enhanced_field(default=None, sensitive=True) + profile: t.Optional[str] = None + azure_workspace_resource_id: t.Optional[str] = None + azure_client_secret: t.Optional[str] = enhanced_field(default=None, sensitive=True) + azure_client_id: t.Optional[str] = None + azure_tenant_id: t.Optional[str] = None + azure_environment: t.Optional[str] = None + auth_type: t.Optional[str] = None + cluster_id: t.Optional[str] = None + google_credentials: t.Optional[str] = None + google_service_account: t.Optional[str] = None + + +@dataclass +class SimpleDatabricksVolumesConfig(BaseConnectorConfig): + access_config: DatabricksVolumesAccessConfig + host: t.Optional[str] = None + + +@dataclass +class DatabricksVolumesWriteConfig(WriteConfig): + volume: str + catalog: str + volume_path: t.Optional[str] = None + overwrite: bool = False + encoding: str = "utf-8" + schema: str = "default" + + @property + def path(self) -> str: + path = f"/Volumes/{self.catalog}/{self.schema}/{self.volume}" + if self.volume_path: + path = f"{path}/{self.volume_path}" + return path + + +@dataclass +class DatabricksVolumesDestinationConnector(BaseDestinationConnector): + write_config: DatabricksVolumesWriteConfig + connector_config: SimpleDatabricksVolumesConfig + _client: t.Optional["WorkspaceClient"] = field(init=False, default=None) + + def to_dict(self, **kwargs): + self_cp = copy.copy(self) + if hasattr(self_cp, "_client"): + setattr(self_cp, "_client", None) + return _asdict(self_cp, **kwargs) + + @requires_dependencies(dependencies=["databricks.sdk"], extras="databricks") + def generate_client(self) -> "WorkspaceClient": + from databricks.sdk import WorkspaceClient + + return WorkspaceClient( + host=self.connector_config.host, **self.connector_config.access_config.to_dict() + ) + + @property + def client(self) -> "WorkspaceClient": + if self._client is None: + self._client = self.generate_client() + return self._client + + def check_connection(self): + pass + + def initialize(self): + _ = self.client + + def write_dict( + self, + *args, + elements_dict: t.List[t.Dict[str, t.Any]], + filename: t.Optional[str] = None, + indent: int = 4, + encoding: str = "utf-8", + **kwargs, + ) -> None: + output_folder = self.write_config.path + output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator + filename = ( + filename.strip(os.sep) if filename else filename + ) # Make sure filename doesn't begin with file seperator + output_path = str(PurePath(output_folder, filename)) if filename else output_folder + logger.debug(f"uploading content to {output_path}") + self.client.files.upload( + file_path=output_path, + contents=BytesIO(json.dumps(elements_dict).encode(encoding=self.write_config.encoding)), + overwrite=self.write_config.overwrite, + ) + + def get_elements_dict(self, docs: t.List[BaseSingleIngestDoc]) -> t.List[t.Dict[str, t.Any]]: + pass + + def write(self, docs: t.List[BaseSingleIngestDoc]) -> None: + for doc in docs: + file_path = doc.base_output_filename + filename = file_path if file_path else None + with open(doc._output_filename) as json_file: + logger.debug(f"uploading content from {doc._output_filename}") + json_list = json.load(json_file) + self.write_dict(elements_dict=json_list, filename=filename) diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index 6e7dbcfad..b2160c0dd 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -3,6 +3,7 @@ import typing as t from .azure_cognitive_search import AzureCognitiveSearchWriter from .base_writer import Writer from .chroma import ChromaWriter +from .databricks_volumes import DatabricksVolumesWriter from .delta_table import DeltaTableWriter from .elasticsearch import ElasticsearchWriter from .fsspec.azure import AzureWriter @@ -22,6 +23,7 @@ writer_map: t.Dict[str, t.Type[Writer]] = { "azure_cognitive_search": AzureCognitiveSearchWriter, "box": BoxWriter, "chroma": ChromaWriter, + "databricks_volumes": DatabricksVolumesWriter, "delta_table": DeltaTableWriter, "dropbox": DropboxWriter, "elasticsearch": ElasticsearchWriter, diff --git a/unstructured/ingest/runner/writers/databricks_volumes.py b/unstructured/ingest/runner/writers/databricks_volumes.py new file mode 100644 index 000000000..74703f850 --- /dev/null +++ b/unstructured/ingest/runner/writers/databricks_volumes.py @@ -0,0 +1,25 @@ +import typing as t +from dataclasses import dataclass + +from unstructured.ingest.enhanced_dataclass import EnhancedDataClassJsonMixin +from unstructured.ingest.interfaces import BaseDestinationConnector +from unstructured.ingest.runner.writers.base_writer import Writer + +if t.TYPE_CHECKING: + from unstructured.ingest.connector.databricks_volumes import ( + DatabricksVolumesWriteConfig, + SimpleDatabricksVolumesConfig, + ) + + +@dataclass +class DatabricksVolumesWriter(Writer, EnhancedDataClassJsonMixin): + write_config: "DatabricksVolumesWriteConfig" + connector_config: "SimpleDatabricksVolumesConfig" + + def get_connector_cls(self) -> t.Type[BaseDestinationConnector]: + from unstructured.ingest.connector.databricks_volumes import ( + DatabricksVolumesDestinationConnector, + ) + + return DatabricksVolumesDestinationConnector