bugfix/mapping source connectors in destination cli commands (#1788)

### Description
Due to the dynamic nature of how the source connector is called when a
destination command is invoked, the configs need to be mapped and the
fsspec config needs to be dynamically added based on the type of runner
being used. This code was added to all currently supported destination
commands.
This commit is contained in:
Roman Isecke 2023-10-20 13:21:06 -04:00 committed by GitHub
parent 1b90028501
commit 15b696925a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 51 deletions

View File

@ -1,4 +1,4 @@
## 0.10.25-dev5
## 0.10.25-dev6
### Enhancements
@ -15,6 +15,7 @@
* **Fix language detection of elements with empty strings** This resolves a warning message that was raised by `langdetect` if the language was attempted to be detected on an empty string. Language detection is now skipped for empty strings.
* **Fix chunks breaking on regex-metadata matches.** Fixes "over-chunking" when `regex_metadata` was used, where every element that contained a regex-match would start a new chunk.
* **Fix regex-metadata match offsets not adjusted within chunk.** Fixes incorrect regex-metadata match start/stop offset in chunks where multiple elements are combined.
* **Map source cli command configs when destination set** Due to how the source connector is dynamically called when the destination connector is set via the CLI, the configs were being set incorrectoy, causing the source connector to break. The configs were fixed and updated to take into account Fsspec-specific connectors.
## 0.10.24

View File

@ -1 +1 @@
__version__ = "0.10.25-dev5" # pragma: no cover
__version__ = "0.10.25-dev6" # pragma: no cover

View File

@ -10,10 +10,11 @@ from unstructured.ingest.cli.common import (
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import conform_click_options, extract_configs
from unstructured.ingest.cli.utils import conform_click_options, orchestrate_runner
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import runner_map
pass
@dataclass
@ -68,15 +69,12 @@ def azure_cognitive_search_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(options, validate=[AzureCognitiveSearchCliWriteConfig])
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="azure_cognitive_search",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[AzureCognitiveSearchCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)

View File

@ -10,10 +10,16 @@ from unstructured.ingest.cli.common import (
from unstructured.ingest.cli.interfaces import (
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import DeltaTableRunner, runner_map
from unstructured.ingest.runner import DeltaTableRunner
@dataclass
@ -118,17 +124,12 @@ def delta_table_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(parent_options, validate=[DeltaTableCliConfig])
# Validate write configs
DeltaTableCliWriteConfig.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="delta_table",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[DeltaTableCliWriteConfig],
)
except Exception as e:
logger.error(e, exc_info=True)

View File

@ -11,10 +11,16 @@ from unstructured.ingest.cli.interfaces import (
CliFilesStorageConfig,
CliMixin,
)
from unstructured.ingest.cli.utils import Group, add_options, conform_click_options, extract_configs
from unstructured.ingest.cli.utils import (
Group,
add_options,
conform_click_options,
extract_configs,
orchestrate_runner,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import FsspecBaseRunner, S3Runner, runner_map
from unstructured.ingest.runner import S3Runner
@dataclass
@ -85,22 +91,12 @@ def s3_dest(ctx: click.Context, **options):
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
configs = extract_configs(options, validate=[S3CliConfig])
runner_cls = runner_map[source_cmd]
configs = extract_configs(
options,
validate=[S3CliConfig],
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
runner = runner_cls(
**configs, # type: ignore
orchestrate_runner(
source_cmd=source_cmd,
writer_type="s3",
writer_kwargs=options,
)
runner.run(
**parent_options,
parent_options=parent_options,
options=options,
validate=[S3CliConfig, CliFilesStorageConfig],
)
except Exception as e:
logger.error(e, exc_info=True)

View File

@ -13,9 +13,35 @@ from unstructured.ingest.cli.interfaces import (
CliReadConfig,
CliRetryStrategyConfig,
)
from unstructured.ingest.interfaces import (
BaseConfig,
)
from unstructured.ingest.interfaces import BaseConfig, FsspecConfig
from unstructured.ingest.runner import FsspecBaseRunner, runner_map
def orchestrate_runner(
source_cmd: str,
writer_type: str,
parent_options: dict,
options: dict,
validate: t.Optional[t.List[t.Type[BaseConfig]]] = None,
):
runner_cls = runner_map[source_cmd]
configs = extract_configs(
parent_options,
extras={"fsspec_config": FsspecConfig}
if issubclass(runner_cls, FsspecBaseRunner)
else None,
)
for val in validate:
val.from_dict(options)
runner_cls = runner_map[source_cmd]
runner = runner_cls(
**configs, # type: ignore
writer_type=writer_type,
writer_kwargs=options,
)
runner.run(
**parent_options,
)
def conform_click_options(options: dict):

View File

@ -249,16 +249,20 @@ class FsspecDestinationConnector(BaseDestinationConnector):
logger.info(f"Writing content using filesystem: {type(fs).__name__}")
s3_folder = self.connector_config.path_without_protocol
s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder
full_s3_path = f"s3://{s3_output_path}"
logger.debug(f"uploading content to {full_s3_path}")
fs.write_text(full_s3_path, json.dumps(json_list, indent=indent), encoding=encoding)
output_folder = self.connector_config.path_without_protocol
output_folder = os.path.join(output_folder) # Make sure folder ends with file seperator
filename = (
filename.strip(os.sep) if filename else filename
) # Make sure filename doesn't begin with file seperator
output_path = str(PurePath(output_folder, filename)) if filename else output_folder
full_output_path = f"s3://{output_path}"
logger.debug(f"uploading content to {full_output_path}")
fs.write_text(full_output_path, json.dumps(json_list, indent=indent), encoding=encoding)
def write(self, docs: t.List[BaseIngestDoc]) -> None:
for doc in docs:
s3_file_path = doc.base_filename
filename = s3_file_path if s3_file_path else None
file_path = doc.base_filename
filename = file_path if file_path else None
with open(doc._output_filename) as json_file:
logger.debug(f"uploading content from {doc._output_filename}")
json_list = json.load(json_file)