From 9c7ee8921a47d3d10e9abcf98b12f08df8a2db6c Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Mon, 16 Oct 2023 10:26:30 -0400 Subject: [PATCH] roman/fsspec compression support (#1730) ### Description Opened to replace original PR: [1443](https://github.com/Unstructured-IO/unstructured/pull/1443) --- CHANGELOG.md | 12 +- .../test-ingest-s3-compression.sh | 37 ++++++ unstructured/__version__.py | 2 +- unstructured/ingest/cli/cmds/azure.py | 13 ++- .../ingest/cli/cmds/azure_cognitive_search.py | 2 +- unstructured/ingest/cli/cmds/box.py | 13 ++- unstructured/ingest/cli/cmds/delta_table.py | 2 +- unstructured/ingest/cli/cmds/dropbox.py | 13 ++- unstructured/ingest/cli/cmds/fsspec.py | 8 +- unstructured/ingest/cli/cmds/gcs.py | 13 ++- unstructured/ingest/cli/cmds/s3.py | 27 +++-- unstructured/ingest/cli/interfaces.py | 20 +++- unstructured/ingest/cli/utils.py | 4 + unstructured/ingest/compression_support.py | 105 ++++++++++++++++++ unstructured/ingest/connector/dropbox.py | 2 +- unstructured/ingest/connector/fsspec.py | 105 +++++++++--------- unstructured/ingest/connector/git.py | 1 - unstructured/ingest/connector/local.py | 4 +- unstructured/ingest/interfaces.py | 63 +++++++++++ .../ingest/pipeline/reformat/embedding.py | 2 +- unstructured/ingest/runner/__init__.py | 6 +- unstructured/ingest/runner/azure.py | 18 ++- unstructured/ingest/runner/base_runner.py | 13 +++ unstructured/ingest/runner/box.py | 16 +-- unstructured/ingest/runner/dropbox.py | 18 ++- unstructured/ingest/runner/fsspec.py | 18 ++- unstructured/ingest/runner/gcs.py | 17 ++- unstructured/ingest/runner/s3.py | 17 ++- unstructured/ingest/runner/writers.py | 2 +- 29 files changed, 413 insertions(+), 160 deletions(-) create mode 100755 test_unstructured_ingest/test-ingest-s3-compression.sh create mode 100644 unstructured/ingest/compression_support.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 00dc97d3d..69bf0a9bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 0.10.24-dev0 + +### Enhancements + +* **Ingest compression utilities and fsspec connector support** Generic utility code added to handle files that get pulled from a source connector that are either tar or zip compressed and uncompress them locally. This is then processed using a local source connector. Currently this functionality has been incorporated into the fsspec connector and all those inheriting from it (currently: Azure Blob Storage, Google Cloud Storage, S3, Box, and Dropbox). + +### Features + +### Fixes + ## 0.10.23 ### Enhancements @@ -1505,4 +1515,4 @@ of an email. ## 0.2.0 -* Initial release of unstructured +* Initial release of unstructured \ No newline at end of file diff --git a/test_unstructured_ingest/test-ingest-s3-compression.sh b/test_unstructured_ingest/test-ingest-s3-compression.sh new file mode 100755 index 000000000..aee21c1dc --- /dev/null +++ b/test_unstructured_ingest/test-ingest-s3-compression.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +set -e + + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=s3-compression +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup() { + cleanup_dir "$OUTPUT_DIR" + cleanup_dir "$WORK_DIR" +} +trap cleanup EXIT + +PYTHONPATH=. ./unstructured/ingest/main.py \ + s3 \ + --num-processes "$max_processes" \ + --download-dir "$DOWNLOAD_DIR" \ + --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ + --strategy fast \ + --preserve-downloads \ + --reprocess \ + --output-dir "$OUTPUT_DIR" \ + --verbose \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set-w-compression/ \ + --anonymous \ + --work-dir "$WORK_DIR" \ + --uncompress + +"$SCRIPT_DIR"/check-num-files-output.sh 12 $OUTPUT_FOLDER_NAME diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 9b9dbfa4c..5f1ae2897 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.23" # pragma: no cover +__version__ = "0.10.24-dev0" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/azure.py b/unstructured/ingest/cli/cmds/azure.py index 5a54bc279..93a4c1bd7 100644 --- a/unstructured/ingest/cli/cmds/azure.py +++ b/unstructured/ingest/cli/cmds/azure.py @@ -8,12 +8,11 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( + CliFilesStorageConfig, CliMixin, - CliRecursiveConfig, - CliRemoteUrlConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import AzureRunner @@ -58,7 +57,11 @@ def azure_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[AzureCliConfig]) + configs = extract_configs( + options, + validate=[AzureCliConfig], + extras={"fsspec_config": FsspecConfig}, + ) runner = AzureRunner( **configs, # type: ignore ) @@ -70,5 +73,5 @@ def azure_source(ctx: click.Context, **options): def get_source_cmd() -> click.Group: cmd = azure_source - add_options(cmd, extras=[AzureCliConfig, CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[AzureCliConfig, CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py index ebbcfb263..9e6398ae0 100644 --- a/unstructured/ingest/cli/cmds/azure_cognitive_search.py +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -70,7 +70,7 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): configs = extract_configs(options, validate=[AzureCognitiveSearchCliWriteConfig]) runner_cls = runner_map[source_cmd] runner = runner_cls( - **configs, + **configs, # type: ignore writer_type="azure_cognitive_search", writer_kwargs=options, ) diff --git a/unstructured/ingest/cli/cmds/box.py b/unstructured/ingest/cli/cmds/box.py index 49b90f30a..66efff575 100644 --- a/unstructured/ingest/cli/cmds/box.py +++ b/unstructured/ingest/cli/cmds/box.py @@ -8,12 +8,11 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( + CliFilesStorageConfig, CliMixin, - CliRecursiveConfig, - CliRemoteUrlConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import BoxRunner @@ -45,7 +44,11 @@ def box_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[BoxCliConfig]) + configs = extract_configs( + options, + validate=[BoxCliConfig], + extras={"fsspec_config": FsspecConfig}, + ) runner = BoxRunner( **configs, # type: ignore ) @@ -57,5 +60,5 @@ def box_source(ctx: click.Context, **options): def get_source_cmd() -> click.Group: cmd = box_source - add_options(cmd, extras=[BoxCliConfig, CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[BoxCliConfig, CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index 1335cbe21..061e85336 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -123,7 +123,7 @@ def delta_table_dest(ctx: click.Context, **options): DeltaTableCliWriteConfig.from_dict(options) runner_cls = runner_map[source_cmd] runner = runner_cls( - **configs, + **configs, # type: ignore writer_type="delta_table", writer_kwargs=options, ) diff --git a/unstructured/ingest/cli/cmds/dropbox.py b/unstructured/ingest/cli/cmds/dropbox.py index f06c263de..67f4fc5ef 100644 --- a/unstructured/ingest/cli/cmds/dropbox.py +++ b/unstructured/ingest/cli/cmds/dropbox.py @@ -7,12 +7,11 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( + CliFilesStorageConfig, CliMixin, - CliRecursiveConfig, - CliRemoteUrlConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import DropboxRunner @@ -44,7 +43,11 @@ def dropbox_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[DropboxCliConfig]) + configs = extract_configs( + options, + validate=[DropboxCliConfig], + extras={"fsspec_config": FsspecConfig}, + ) runner = DropboxRunner( **configs, # type: ignore ) @@ -56,5 +59,5 @@ def dropbox_source(ctx: click.Context, **options): def get_source_cmd() -> click.Group: cmd = dropbox_source - add_options(cmd, extras=[DropboxCliConfig, CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[DropboxCliConfig, CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/cmds/fsspec.py b/unstructured/ingest/cli/cmds/fsspec.py index f4ad0b131..80d71f0ee 100644 --- a/unstructured/ingest/cli/cmds/fsspec.py +++ b/unstructured/ingest/cli/cmds/fsspec.py @@ -6,10 +6,10 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( - CliRecursiveConfig, - CliRemoteUrlConfig, + CliFilesStorageConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.interfaces import FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import FsspecRunner @@ -25,7 +25,7 @@ def fsspec_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options) + configs = extract_configs(options, extras={"fsspec_config": FsspecConfig}) runner = FsspecRunner( **configs, # type: ignore ) @@ -37,5 +37,5 @@ def fsspec_source(ctx: click.Context, **options): def get_source_cmd() -> click.Group: cmd = fsspec_source - add_options(cmd, extras=[CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/cmds/gcs.py b/unstructured/ingest/cli/cmds/gcs.py index 0b549004e..5c39b2975 100644 --- a/unstructured/ingest/cli/cmds/gcs.py +++ b/unstructured/ingest/cli/cmds/gcs.py @@ -8,12 +8,11 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( + CliFilesStorageConfig, CliMixin, - CliRecursiveConfig, - CliRemoteUrlConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import GCSRunner @@ -47,7 +46,11 @@ def gcs_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=([GcsCliConfig])) + configs = extract_configs( + options, + validate=([GcsCliConfig]), + extras={"fsspec_config": FsspecConfig}, + ) runner = GCSRunner( **configs, # type: ignore ) @@ -59,5 +62,5 @@ def gcs_source(ctx: click.Context, **options): def get_source_cmd() -> click.Group: cmd = gcs_source - add_options(cmd, extras=[GcsCliConfig, CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[GcsCliConfig, CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/cmds/s3.py b/unstructured/ingest/cli/cmds/s3.py index 572578e39..675418da8 100644 --- a/unstructured/ingest/cli/cmds/s3.py +++ b/unstructured/ingest/cli/cmds/s3.py @@ -8,14 +8,13 @@ from unstructured.ingest.cli.common import ( log_options, ) from unstructured.ingest.cli.interfaces import ( + CliFilesStorageConfig, CliMixin, - CliRecursiveConfig, - CliRemoteUrlConfig, ) from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import S3Runner, runner_map +from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map @dataclass @@ -57,7 +56,11 @@ def s3_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[S3CliConfig]) + configs = extract_configs( + options, + validate=[S3CliConfig], + extras={"fsspec_config": FsspecConfig}, + ) s3_runner = S3Runner( **configs, # type: ignore ) @@ -82,10 +85,16 @@ def s3_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[S3CliConfig]) runner_cls = runner_map[source_cmd] + configs = extract_configs( + options, + validate=[S3CliConfig], + extras={"fsspec_config": FsspecConfig} + if issubclass(runner_cls, FsspecBaseRunner) + else None, + ) runner = runner_cls( - **configs, + **configs, # type: ignore writer_type="s3", writer_kwargs=options, ) @@ -100,11 +109,11 @@ def s3_dest(ctx: click.Context, **options): def get_dest_cmd() -> click.Command: cmd = s3_dest S3CliConfig.add_cli_options(cmd) - CliRemoteUrlConfig.add_cli_options(cmd) + CliFilesStorageConfig.add_cli_options(cmd) return cmd def get_source_cmd() -> click.Group: cmd = s3_source - add_options(cmd, extras=[S3CliConfig, CliRemoteUrlConfig, CliRecursiveConfig]) + add_options(cmd, extras=[S3CliConfig, CliFilesStorageConfig]) return cmd diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index d5d5e04d2..cbae66ec9 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -10,6 +10,7 @@ from unstructured.ingest.interfaces import ( BaseConfig, ChunkingConfig, EmbeddingConfig, + FileStorageConfig, PartitionConfig, PermissionsConfig, ProcessorConfig, @@ -248,9 +249,7 @@ class CliRecursiveConfig(BaseConfig, CliMixin): cmd.params.extend(options) -class CliRemoteUrlConfig(BaseConfig, CliMixin): - remote_url: str - +class CliFilesStorageConfig(FileStorageConfig, CliMixin): @staticmethod def add_cli_options(cmd: click.Command) -> None: options = [ @@ -259,6 +258,21 @@ class CliRemoteUrlConfig(BaseConfig, CliMixin): required=True, help="Remote fsspec URL formatted as `protocol://dir/path`", ), + click.Option( + ["--uncompress"], + type=bool, + default=False, + is_flag=True, + help="Uncompress any archived files. Currently supporting zip and tar " + "files based on file extension.", + ), + click.Option( + ["--recursive"], + is_flag=True, + default=False, + help="Recursively download files in their respective folders " + "otherwise stop at the files in provided folder level.", + ), ] cmd.params.extend(options) diff --git a/unstructured/ingest/cli/utils.py b/unstructured/ingest/cli/utils.py index 61df72f40..0a749da8d 100644 --- a/unstructured/ingest/cli/utils.py +++ b/unstructured/ingest/cli/utils.py @@ -26,6 +26,7 @@ def conform_click_options(options: dict): def extract_configs( data: dict, + extras: t.Optional[t.Dict[str, t.Type[BaseConfig]]] = None, validate: t.Optional[t.List[t.Type[BaseConfig]]] = None, ) -> t.Dict[str, BaseConfig]: """ @@ -42,6 +43,9 @@ def extract_configs( "processor_config": CliProcessorConfig.from_dict(data), "permissions_config": CliPermissionsConfig.from_dict(data), } + if extras: + for k, conf in extras.items(): + res[k] = conf.from_dict(data) for v in validate: v.from_dict(data) return res diff --git a/unstructured/ingest/compression_support.py b/unstructured/ingest/compression_support.py new file mode 100644 index 000000000..1f97a06c7 --- /dev/null +++ b/unstructured/ingest/compression_support.py @@ -0,0 +1,105 @@ +import copy +import os +import tarfile +import zipfile +from dataclasses import dataclass +from pathlib import Path +from typing import List, Optional + +from unstructured.ingest.connector.local import LocalSourceConnector, SimpleLocalConfig +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseIngestDoc, + ProcessorConfig, + ReadConfig, +) +from unstructured.ingest.logger import logger + +ZIP_FILE_EXT = [".zip"] +TAR_FILE_EXT = [".tar", ".tar.gz", ".tgz"] + + +def uncompress_file(filename: str, path: Optional[str] = None) -> str: + """ + Takes in a compressed zip or tar file and uncompresses it + """ + # Create path if it doesn't already exist + if path: + Path(path).mkdir(parents=True, exist_ok=True) + + if any(filename.endswith(ext) for ext in ZIP_FILE_EXT): + return uncompress_zip_file(zip_filename=filename, path=path) + elif any(filename.endswith(ext) for ext in TAR_FILE_EXT): + return uncompress_tar_file(tar_filename=filename, path=path) + else: + raise ValueError( + "filename {} not a recognized compressed extension: {}".format( + filename, + ", ".join(ZIP_FILE_EXT + TAR_FILE_EXT), + ), + ) + + +def uncompress_zip_file(zip_filename: str, path: Optional[str] = None) -> str: + head, tail = os.path.split(zip_filename) + for ext in ZIP_FILE_EXT: + if tail.endswith(ext): + tail = tail[: -(len(ext))] + break + path = path if path else os.path.join(head, f"{tail}-zip-uncompressed") + logger.info(f"extracting zip {zip_filename} -> {path}") + with zipfile.ZipFile(zip_filename) as zfile: + zfile.extractall(path=path) + return path + + +def uncompress_tar_file(tar_filename: str, path: Optional[str] = None) -> str: + head, tail = os.path.split(tar_filename) + for ext in TAR_FILE_EXT: + if tail.endswith(ext): + tail = tail[: -(len(ext))] + break + + path = path if path else os.path.join(head, f"{tail}-tar-uncompressed") + logger.info(f"extracting tar {tar_filename} -> {path}") + with tarfile.open(tar_filename, "r:gz") as tfile: + tfile.extractall(path=path) + return path + + +@dataclass +class CompressionSourceConnectorMixin: + processor_config: ProcessorConfig + read_config: ReadConfig + connector_config: BaseConnectorConfig + + def process_compressed_doc(self, doc: BaseIngestDoc) -> List[BaseIngestDoc]: + """ + Utility function which helps process compressed files. Extracts the contents and returns + generated ingest docs via local source connector + """ + # Download the raw file to local + doc.get_file() + path = uncompress_file(filename=str(doc.filename)) + new_read_configs = copy.copy(self.read_config) + new_process_configs = copy.copy(self.processor_config) + relative_path = path.replace(self.read_config.download_dir, "") + + if self.processor_config.output_dir.endswith(os.sep): + new_process_configs.output_dir = f"{self.processor_config.output_dir}{relative_path}" + else: + new_process_configs.output_dir = ( + f"{self.processor_config.output_dir}{os.sep}{relative_path}" + ) + + local_connector = LocalSourceConnector( + connector_config=SimpleLocalConfig( + input_path=path, + recursive=True, + ), + read_config=new_read_configs, + processor_config=new_process_configs, + ) + logger.info(f"Created local source connector: {local_connector.to_json()}") + local_connector.initialize() + return local_connector.get_ingest_docs() diff --git a/unstructured/ingest/connector/dropbox.py b/unstructured/ingest/connector/dropbox.py index f27376f87..000e0709d 100644 --- a/unstructured/ingest/connector/dropbox.py +++ b/unstructured/ingest/connector/dropbox.py @@ -100,7 +100,7 @@ class DropboxSourceConnector(FsspecSourceConnector): return elif ls_output: raise ValueError( - f"No objects found in {self.connector_config.path}.", + f"No objects found in {self.connector_config.remote_url}.", ) else: raise MissingFolderError( diff --git a/unstructured/ingest/connector/fsspec.py b/unstructured/ingest/connector/fsspec.py index 3b63de7df..7c19354bb 100644 --- a/unstructured/ingest/connector/fsspec.py +++ b/unstructured/ingest/connector/fsspec.py @@ -1,16 +1,21 @@ import os -import re import typing as t from contextlib import suppress -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path, PurePath +from unstructured.ingest.compression_support import ( + TAR_FILE_EXT, + ZIP_FILE_EXT, + CompressionSourceConnectorMixin, +) from unstructured.ingest.error import SourceConnectionError from unstructured.ingest.interfaces import ( BaseConnectorConfig, BaseDestinationConnector, BaseIngestDoc, BaseSourceConnector, + FsspecConfig, IngestDocCleanupMixin, SourceConnectorCleanupMixin, SourceMetadata, @@ -33,49 +38,8 @@ SUPPORTED_REMOTE_FSSPEC_PROTOCOLS = [ @dataclass -class SimpleFsspecConfig(BaseConnectorConfig): - # fsspec specific options - path: str - recursive: bool = False - access_kwargs: dict = field(default_factory=dict) - protocol: str = field(init=False) - path_without_protocol: str = field(init=False) - dir_path: str = field(init=False) - file_path: str = field(init=False) - - def get_access_kwargs(self) -> dict: - return self.access_kwargs - - def __post_init__(self): - self.protocol, self.path_without_protocol = self.path.split("://") - if self.protocol not in SUPPORTED_REMOTE_FSSPEC_PROTOCOLS: - raise ValueError( - f"Protocol {self.protocol} not supported yet, only " - f"{SUPPORTED_REMOTE_FSSPEC_PROTOCOLS} are supported.", - ) - - # dropbox root is an empty string - match = re.match(rf"{self.protocol}://([\s])/", self.path) - if match and self.protocol == "dropbox": - self.dir_path = " " - self.file_path = "" - return - - # just a path with no trailing prefix - match = re.match(rf"{self.protocol}://([^/\s]+?)(/*)$", self.path) - if match: - self.dir_path = match.group(1) - self.file_path = "" - return - - # valid path with a dir and/or file - match = re.match(rf"{self.protocol}://([^/\s]+?)/([^\s]*)", self.path) - if not match: - raise ValueError( - f"Invalid path {self.path}. Expected :///.", - ) - self.dir_path = match.group(1) - self.file_path = match.group(2) or "" +class SimpleFsspecConfig(FsspecConfig, BaseConnectorConfig): + pass @dataclass @@ -167,7 +131,11 @@ class FsspecIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): @dataclass -class FsspecSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): +class FsspecSourceConnector( + SourceConnectorCleanupMixin, + CompressionSourceConnectorMixin, + BaseSourceConnector, +): """Objects of this class support fetching document(s) from""" connector_config: SimpleFsspecConfig @@ -186,7 +154,7 @@ class FsspecSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): ls_output = self.fs.ls(self.connector_config.path_without_protocol) if len(ls_output) < 1: raise ValueError( - f"No objects found in {self.connector_config.path}.", + f"No objects found in {self.connector_config.remote_url}.", ) def _list_files(self): @@ -212,15 +180,44 @@ class FsspecSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): ] def get_ingest_docs(self): - return [ - self.ingest_doc_cls( - processor_config=self.processor_config, + files = self._list_files() + # remove compressed files + compressed_file_ext = TAR_FILE_EXT + ZIP_FILE_EXT + compressed_files = [] + uncompressed_files = [] + docs: t.List[BaseIngestDoc] = [] + for file in files: + if any(file.endswith(ext) for ext in compressed_file_ext): + compressed_files.append(file) + else: + uncompressed_files.append(file) + docs.extend( + [ + self.ingest_doc_cls( + read_config=self.read_config, + connector_config=self.connector_config, + processor_config=self.processor_config, + remote_file_path=file, + ) + for file in uncompressed_files + ], + ) + if not self.connector_config.uncompress: + return docs + for compressed_file in compressed_files: + compressed_doc = self.ingest_doc_cls( read_config=self.read_config, + processor_config=self.processor_config, connector_config=self.connector_config, - remote_file_path=file, + remote_file_path=compressed_file, ) - for file in self._list_files() - ] + try: + local_ingest_docs = self.process_compressed_doc(doc=compressed_doc) + logger.info(f"adding {len(local_ingest_docs)} from {compressed_file}") + docs.extend(local_ingest_docs) + finally: + compressed_doc.cleanup_file() + return docs @dataclass @@ -245,7 +242,7 @@ class FsspecDestinationConnector(BaseDestinationConnector): for doc in docs: s3_file_path = doc.base_filename - s3_folder = self.connector_config.path + s3_folder = self.connector_config.remote_url s3_output_path = str(PurePath(s3_folder, s3_file_path)) if s3_file_path else s3_folder logger.debug(f"Uploading {doc._output_filename} -> {s3_output_path}") diff --git a/unstructured/ingest/connector/git.py b/unstructured/ingest/connector/git.py index 31e044b0f..1b05e19d3 100644 --- a/unstructured/ingest/connector/git.py +++ b/unstructured/ingest/connector/git.py @@ -57,7 +57,6 @@ class GitIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): @SourceConnectionError.wrap @BaseIngestDoc.skip_if_file_exists def get_file(self): - print(self) """Fetches the "remote" doc and stores it locally on the filesystem.""" self._create_full_tmp_dir_path() logger.debug(f"Fetching {self} - PID: {os.getpid()}") diff --git a/unstructured/ingest/connector/local.py b/unstructured/ingest/connector/local.py index c4339b811..f3fa86995 100644 --- a/unstructured/ingest/connector/local.py +++ b/unstructured/ingest/connector/local.py @@ -70,7 +70,9 @@ class LocalSourceConnector(BaseSourceConnector): """Objects of this class support fetching document(s) from local file system""" connector_config: SimpleLocalConfig - ingest_doc_cls: t.Type[LocalIngestDoc] = LocalIngestDoc + + def __post_init__(self): + self.ingest_doc_cls: t.Type[LocalIngestDoc] = LocalIngestDoc def cleanup(self, cur_dir=None): """Not applicable to local file system""" diff --git a/unstructured/ingest/interfaces.py b/unstructured/ingest/interfaces.py index f3deefbb0..c0d2b1679 100644 --- a/unstructured/ingest/interfaces.py +++ b/unstructured/ingest/interfaces.py @@ -4,6 +4,7 @@ through Unstructured.""" import functools import json import os +import re import typing as t from abc import ABC, abstractmethod from dataclasses import dataclass, field @@ -23,6 +24,17 @@ from unstructured.ingest.logger import logger from unstructured.partition.auto import partition from unstructured.staging.base import convert_to_dict, elements_from_json +SUPPORTED_REMOTE_FSSPEC_PROTOCOLS = [ + "s3", + "s3a", + "abfs", + "az", + "gs", + "gcs", + "box", + "dropbox", +] + @dataclass class BaseSessionHandle(ABC): @@ -63,6 +75,57 @@ class ProcessorConfig(BaseConfig): raise_on_error: bool = False +@dataclass +class FileStorageConfig(BaseConfig): + remote_url: str + uncompress: bool = False + recursive: bool = False + + +@dataclass +class FsspecConfig(FileStorageConfig): + access_kwargs: dict = field(default_factory=dict) + protocol: str = field(init=False) + path_without_protocol: str = field(init=False) + dir_path: str = field(init=False) + file_path: str = field(init=False) + + def get_access_kwargs(self) -> dict: + return self.access_kwargs + + def __post_init__(self): + self.protocol, self.path_without_protocol = self.remote_url.split("://") + if self.protocol not in SUPPORTED_REMOTE_FSSPEC_PROTOCOLS: + raise ValueError( + f"Protocol {self.protocol} not supported yet, only " + f"{SUPPORTED_REMOTE_FSSPEC_PROTOCOLS} are supported.", + ) + + # dropbox root is an empty string + match = re.match(rf"{self.protocol}://([\s])/", self.remote_url) + if match and self.protocol == "dropbox": + self.dir_path = " " + self.file_path = "" + return + + # just a path with no trailing prefix + match = re.match(rf"{self.protocol}://([^/\s]+?)(/*)$", self.remote_url) + if match: + self.dir_path = match.group(1) + self.file_path = "" + return + + # valid path with a dir and/or file + match = re.match(rf"{self.protocol}://([^/\s]+?)/([^\s]*)", self.remote_url) + if not match: + raise ValueError( + f"Invalid path {self.remote_url}. " + f"Expected :///.", + ) + self.dir_path = match.group(1) + self.file_path = match.group(2) or "" + + @dataclass class ReadConfig(BaseConfig): # where raw documents are stored for processing, and then removed if not preserve_downloads diff --git a/unstructured/ingest/pipeline/reformat/embedding.py b/unstructured/ingest/pipeline/reformat/embedding.py index 0b1724b08..04b3682dc 100644 --- a/unstructured/ingest/pipeline/reformat/embedding.py +++ b/unstructured/ingest/pipeline/reformat/embedding.py @@ -33,7 +33,7 @@ class Embedder(ReformatNode): filename_ext = os.path.basename(elements_json_filename) filename = os.path.splitext(filename_ext)[0] hashed_filename = hashlib.sha256( - f"{self.create_hash()}{filename}".encode() + f"{self.create_hash()}{filename}".encode(), ).hexdigest()[:32] json_filename = f"{hashed_filename}.json" json_path = (Path(self.get_path()) / json_filename).resolve() diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index f68a8dbdb..5bf2dd703 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -1,7 +1,9 @@ import typing as t +from typing import Type from .airtable import AirtableRunner from .azure import AzureRunner +from .base_runner import FsspecBaseRunner, Runner from .biomed import BiomedRunner from .box import BoxRunner from .confluence import ConfluenceRunner @@ -26,7 +28,7 @@ from .sharepoint import SharePointRunner from .slack import SlackRunner from .wikipedia import WikipediaRunner -runner_map: t.Dict[str, t.Callable] = { +runner_map: t.Dict[str, Type[Runner]] = { "airtable": AirtableRunner, "azure": AzureRunner, "biomed": BiomedRunner, @@ -82,4 +84,6 @@ __all__ = [ "SlackRunner", "WikipediaRunner", "runner_map", + "Runner", + "FsspecBaseRunner", ] diff --git a/unstructured/ingest/runner/azure.py b/unstructured/ingest/runner/azure.py index c797cbc88..86fde1546 100644 --- a/unstructured/ingest/runner/azure.py +++ b/unstructured/ingest/runner/azure.py @@ -2,18 +2,16 @@ import logging import typing as t from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class AzureRunner(Runner): +class AzureRunner(FsspecBaseRunner): def run( self, account_name: t.Optional[str], account_key: t.Optional[str], connection_string: t.Optional[str], - remote_url: str, - recursive: bool = False, **kwargs, ): ingest_log_streaming_init(logging.DEBUG if self.processor_config.verbose else logging.INFO) @@ -26,7 +24,7 @@ class AzureRunner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="azure", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) @@ -44,13 +42,13 @@ class AzureRunner(Runner): access_kwargs = {"connection_string": connection_string} else: access_kwargs = {} + connector_config = SimpleAzureBlobStorageConfig.from_dict( + self.fsspec_config.to_dict(), # type: ignore + ) + connector_config.access_kwargs = access_kwargs source_doc_connector = AzureBlobStorageSourceConnector( # type: ignore processor_config=self.processor_config, - connector_config=SimpleAzureBlobStorageConfig( - path=remote_url, - recursive=recursive, - access_kwargs=access_kwargs, - ), + connector_config=connector_config, read_config=self.read_config, ) diff --git a/unstructured/ingest/runner/base_runner.py b/unstructured/ingest/runner/base_runner.py index dc4a6db94..ed9c85606 100644 --- a/unstructured/ingest/runner/base_runner.py +++ b/unstructured/ingest/runner/base_runner.py @@ -7,6 +7,7 @@ from unstructured.ingest.interfaces import ( BaseSourceConnector, ChunkingConfig, EmbeddingConfig, + FsspecConfig, PartitionConfig, PermissionsConfig, ProcessorConfig, @@ -60,3 +61,15 @@ class Runner(ABC): chunking_config=self.chunking_config, permissions_config=self.get_permissions_config(), ) + + +@dataclass +class FsspecBaseRunner(Runner): + # TODO make this field required when python3.8 no longer supported + # python3.8 dataclass doesn't support default values in child classes, but this + # fsspec_config should be required in this class. + fsspec_config: t.Optional[FsspecConfig] = None + + def __post_init__(self): + if self.fsspec_config is None: + raise ValueError("fsspec_config must exist") diff --git a/unstructured/ingest/runner/box.py b/unstructured/ingest/runner/box.py index b7a1eeb88..13a60fd2f 100644 --- a/unstructured/ingest/runner/box.py +++ b/unstructured/ingest/runner/box.py @@ -2,15 +2,13 @@ import logging import typing as t from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class BoxRunner(Runner): +class BoxRunner(FsspecBaseRunner): def run( self, - remote_url: str, - recursive: bool = False, box_app_config: t.Optional[str] = None, **kwargs, ): @@ -19,19 +17,17 @@ class BoxRunner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="box", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) from unstructured.ingest.connector.box import BoxSourceConnector, SimpleBoxConfig + connector_config = SimpleBoxConfig.from_dict(self.fsspec_config.to_dict()) # type: ignore + connector_config.access_kwargs = {"box_app_config": box_app_config} source_doc_connector = BoxSourceConnector( # type: ignore read_config=self.read_config, - connector_config=SimpleBoxConfig( - path=remote_url, - recursive=recursive, - access_kwargs={"box_app_config": box_app_config}, - ), + connector_config=connector_config, processor_config=self.processor_config, ) diff --git a/unstructured/ingest/runner/dropbox.py b/unstructured/ingest/runner/dropbox.py index 7dcfa314e..55acf3d66 100644 --- a/unstructured/ingest/runner/dropbox.py +++ b/unstructured/ingest/runner/dropbox.py @@ -2,15 +2,13 @@ import logging import typing as t from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class DropboxRunner(Runner): +class DropboxRunner(FsspecBaseRunner): def run( self, - remote_url: str, - recursive: bool = False, token: t.Optional[str] = None, **kwargs, ): @@ -19,7 +17,7 @@ class DropboxRunner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="dropbox", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) @@ -28,13 +26,13 @@ class DropboxRunner(Runner): SimpleDropboxConfig, ) + connector_config = SimpleDropboxConfig.from_dict( + self.fsspec_config.to_dict(), # type: ignore + ) + connector_config.access_kwargs = {"token": token} source_doc_connector = DropboxSourceConnector( # type: ignore read_config=self.read_config, - connector_config=SimpleDropboxConfig( - path=remote_url, - recursive=recursive, - access_kwargs={"token": token}, - ), + connector_config=connector_config, processor_config=self.processor_config, ) diff --git a/unstructured/ingest/runner/fsspec.py b/unstructured/ingest/runner/fsspec.py index 0d4d6ea6b..9881d6d43 100644 --- a/unstructured/ingest/runner/fsspec.py +++ b/unstructured/ingest/runner/fsspec.py @@ -3,15 +3,13 @@ import warnings from urllib.parse import urlparse from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class FsspecRunner(Runner): +class FsspecRunner(FsspecBaseRunner): def run( self, - remote_url: str, - recursive: bool = False, **kwargs, ): ingest_log_streaming_init(logging.DEBUG if self.processor_config.verbose else logging.INFO) @@ -19,11 +17,11 @@ class FsspecRunner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="fsspec", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) - protocol = urlparse(remote_url).scheme + protocol = urlparse(self.fsspec_config.remote_url).scheme # type: ignore warnings.warn( f"`fsspec` protocol {protocol} is not directly supported by `unstructured`," " so use it at your own risk. Supported protocols are `gcs`, `gs`, `s3`, `s3a`," @@ -36,11 +34,11 @@ class FsspecRunner(Runner): SimpleFsspecConfig, ) + connector_config = SimpleFsspecConfig.from_dict( + self.fsspec_config.to_dict(), # type: ignore + ) source_doc_connector = FsspecSourceConnector( # type: ignore - connector_config=SimpleFsspecConfig( - path=remote_url, - recursive=recursive, - ), + connector_config=connector_config, read_config=self.read_config, processor_config=self.processor_config, ) diff --git a/unstructured/ingest/runner/gcs.py b/unstructured/ingest/runner/gcs.py index d370f4f43..a4570c713 100644 --- a/unstructured/ingest/runner/gcs.py +++ b/unstructured/ingest/runner/gcs.py @@ -2,15 +2,13 @@ import logging import typing as t from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class GCSRunner(Runner): +class GCSRunner(FsspecBaseRunner): def run( self, - remote_url: str, - recursive: bool = False, token: t.Optional[str] = None, **kwargs, ): @@ -19,18 +17,17 @@ class GCSRunner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="gcs", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) from unstructured.ingest.connector.gcs import GcsSourceConnector, SimpleGcsConfig + connector_config = SimpleGcsConfig.from_dict(self.fsspec_config.to_dict()) # type: ignore + connector_config.access_kwargs = {"token": token} + source_doc_connector = GcsSourceConnector( # type: ignore - connector_config=SimpleGcsConfig( - path=remote_url, - recursive=recursive, - access_kwargs={"token": token}, - ), + connector_config=connector_config, read_config=self.read_config, processor_config=self.processor_config, ) diff --git a/unstructured/ingest/runner/s3.py b/unstructured/ingest/runner/s3.py index ad510de08..e8520eee6 100644 --- a/unstructured/ingest/runner/s3.py +++ b/unstructured/ingest/runner/s3.py @@ -2,15 +2,13 @@ import logging import typing as t from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner.base_runner import Runner +from unstructured.ingest.runner.base_runner import FsspecBaseRunner from unstructured.ingest.runner.utils import update_download_dir_remote_url -class S3Runner(Runner): +class S3Runner(FsspecBaseRunner): def run( self, - remote_url: str, - recursive: bool = False, anonymous: bool = False, endpoint_url: t.Optional[str] = None, **kwargs, @@ -20,7 +18,7 @@ class S3Runner(Runner): self.read_config.download_dir = update_download_dir_remote_url( connector_name="s3", read_config=self.read_config, - remote_url=remote_url, + remote_url=self.fsspec_config.remote_url, # type: ignore logger=logger, ) @@ -29,12 +27,11 @@ class S3Runner(Runner): access_kwargs: t.Dict[str, t.Any] = {"anon": anonymous} if endpoint_url: access_kwargs["endpoint_url"] = endpoint_url + + connector_config = SimpleS3Config.from_dict(self.fsspec_config.to_dict()) # type: ignore + connector_config.access_kwargs = access_kwargs source_doc_connector = S3SourceConnector( # type: ignore - connector_config=SimpleS3Config( - path=remote_url, - recursive=recursive, - access_kwargs=access_kwargs, - ), + connector_config=connector_config, read_config=self.read_config, processor_config=self.processor_config, ) diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py index b47a806b2..791bbfeef 100644 --- a/unstructured/ingest/runner/writers.py +++ b/unstructured/ingest/runner/writers.py @@ -25,7 +25,7 @@ def s3_writer( return S3DestinationConnector( write_config=WriteConfig(), connector_config=SimpleS3Config( - path=remote_url, + remote_url=remote_url, access_kwargs=access_kwargs, ), )