diff --git a/CHANGELOG.md b/CHANGELOG.md index 8487f4490..2808f5657 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.25-dev5 +## 0.10.25-dev6 ### Enhancements @@ -15,6 +15,7 @@ * **Fix language detection of elements with empty strings** This resolves a warning message that was raised by `langdetect` if the language was attempted to be detected on an empty string. Language detection is now skipped for empty strings. * **Fix chunks breaking on regex-metadata matches.** Fixes "over-chunking" when `regex_metadata` was used, where every element that contained a regex-match would start a new chunk. * **Fix regex-metadata match offsets not adjusted within chunk.** Fixes incorrect regex-metadata match start/stop offset in chunks where multiple elements are combined. +* **Map source cli command configs when destination set** Due to how the source connector is dynamically called when the destination connector is set via the CLI, the configs were being set incorrectoy, causing the source connector to break. The configs were fixed and updated to take into account Fsspec-specific connectors. ## 0.10.24 diff --git a/unstructured/__version__.py b/unstructured/__version__.py index f6a6b1837..e93cf8b26 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.25-dev5" # pragma: no cover +__version__ = "0.10.25-dev6" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py index b7ccf4f1d..ac11da932 100644 --- a/unstructured/ingest/cli/cmds/azure_cognitive_search.py +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -10,10 +10,11 @@ from unstructured.ingest.cli.common import ( from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import conform_click_options, extract_configs +from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import runner_map + +pass @dataclass @@ -68,15 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[AzureCognitiveSearchCliWriteConfig]) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="azure_cognitive_search", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[AzureCognitiveSearchCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index 1ee17be36..bb1beb2db 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -10,10 +10,16 @@ from unstructured.ingest.cli.common import ( from unstructured.ingest.cli.interfaces import ( CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import DeltaTableRunner, runner_map +from unstructured.ingest.runner import DeltaTableRunner @dataclass @@ -118,17 +124,12 @@ def delta_table_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(parent_options, validate=[DeltaTableCliConfig]) - # Validate write configs - DeltaTableCliWriteConfig.from_dict(options) - runner_cls = runner_map[source_cmd] - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="delta_table", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[DeltaTableCliWriteConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/cmds/s3.py b/unstructured/ingest/cli/cmds/s3.py index 2dc17cdf9..d4d701c05 100644 --- a/unstructured/ingest/cli/cmds/s3.py +++ b/unstructured/ingest/cli/cmds/s3.py @@ -11,10 +11,16 @@ from unstructured.ingest.cli.interfaces import ( CliFilesStorageConfig, CliMixin, ) -from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs +from unstructured.ingest.cli.utils import ( + Group, + add_options, + conform_click_options, + extract_configs, + orchestrate_runner, +) from unstructured.ingest.interfaces import BaseConfig, FsspecConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map +from unstructured.ingest.runner import S3Runner @dataclass @@ -85,22 +91,12 @@ def s3_dest(ctx: click.Context, **options): log_options(parent_options, verbose=verbose) log_options(options, verbose=verbose) try: - configs = extract_configs(options, validate=[S3CliConfig]) - runner_cls = runner_map[source_cmd] - configs = extract_configs( - options, - validate=[S3CliConfig], - extras={"fsspec_config": FsspecConfig} - if issubclass(runner_cls, FsspecBaseRunner) - else None, - ) - runner = runner_cls( - **configs, # type: ignore + orchestrate_runner( + source_cmd=source_cmd, writer_type="s3", - writer_kwargs=options, - ) - runner.run( - **parent_options, + parent_options=parent_options, + options=options, + validate=[S3CliConfig, CliFilesStorageConfig], ) except Exception as e: logger.error(e, exc_info=True) diff --git a/unstructured/ingest/cli/utils.py b/unstructured/ingest/cli/utils.py index 6a03a474b..faebdb522 100644 --- a/unstructured/ingest/cli/utils.py +++ b/unstructured/ingest/cli/utils.py @@ -13,9 +13,35 @@ from unstructured.ingest.cli.interfaces import ( CliReadConfig, CliRetryStrategyConfig, ) -from unstructured.ingest.interfaces import ( - BaseConfig, -) +from unstructured.ingest.interfaces import BaseConfig, FsspecConfig +from unstructured.ingest.runner import FsspecBaseRunner, runner_map + + +def orchestrate_runner( + source_cmd: str, + writer_type: str, + parent_options: dict, + options: dict, + validate: t.Optional[t.List[t.Type[BaseConfig]]] = None, +): + runner_cls = runner_map[source_cmd] + configs = extract_configs( + parent_options, + extras={"fsspec_config": FsspecConfig} + if issubclass(runner_cls, FsspecBaseRunner) + else None, + ) + for val in validate: + val.from_dict(options) + runner_cls = runner_map[source_cmd] + runner = runner_cls( + **configs, # type: ignore + writer_type=writer_type, + writer_kwargs=options, + ) + runner.run( + **parent_options, + ) def conform_click_options(options: dict): diff --git a/unstructured/ingest/connector/fsspec.py b/unstructured/ingest/connector/fsspec.py index 50faa57c1..1aaf2aeac 100644 --- a/unstructured/ingest/connector/fsspec.py +++ b/unstructured/ingest/connector/fsspec.py @@ -249,16 +249,20 @@ class FsspecDestinationConnector(BaseDestinationConnector): logger.info(f"Writing content using filesystem: {type(fs).__name__}") - s3_folder = self.connector_config.path_without_protocol - s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder - full_s3_path = f"s3://{s3_output_path}" - logger.debug(f"uploading content to {full_s3_path}") - fs.write_text(full_s3_path, json.dumps(json_list, indent=indent), encoding=encoding) + output_folder = self.connector_config.path_without_protocol + output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator + filename = ( + filename.strip(os.sep) if filename else filename + ) # Make sure filename doesn't begin with file seperator + output_path = str(PurePath(output_folder, filename)) if filename else output_folder + full_output_path = f"s3://{output_path}" + logger.debug(f"uploading content to {full_output_path}") + fs.write_text(full_output_path, json.dumps(json_list, indent=indent), encoding=encoding) def write(self, docs: t.List[BaseIngestDoc]) -> None: for doc in docs: - s3_file_path = doc.base_filename - filename = s3_file_path if s3_file_path else None + file_path = doc.base_filename + filename = file_path if file_path else None with open(doc._output_filename) as json_file: logger.debug(f"uploading content from {doc._output_filename}") json_list = json.load(json_file)