roman/delta lake dest connector (#1385)

### Description
Add delta table downstream destination connector

Closes https://github.com/Unstructured-IO/unstructured/issues/1415
This commit is contained in:
Roman Isecke 2023-09-15 18:13:39 -04:00 committed by GitHub
parent 98d3541909
commit 333558494e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 307 additions and 42 deletions

View File

@ -21,9 +21,10 @@
* **Better debug output related to sentence counting internals**. Clarify message when sentence is not counted toward sentence count because there aren't enough words, relevant for developers focused on `unstructured`s NLP internals.
* **Faster ocr_only speed for partitioning PDF and images.** Use `unstructured_pytesseract.run_and_get_multiple_output` function to reduce the number of calls to `tesseract` by half when partitioning pdf or image with `tesseract`
* **Adds data source properties to fsspec connectors** These properties (date_created, date_modified, version, source_url, record_locator) are written to element metadata during ingest, mapping elements to information about the document source from which they derive.
* **Add delta table destination connector** New delta table destination connector added to ingest CLI.
* **Rename to Source and Destination Connectors in the Documentation.** Maintain naming consistency between Connectors codebase and documentation with the first addition to a destination connector.
* **Non-HTML text files now return unstructured-elements as opposed to HTML-elements.** Previously the text based files that went through `partition_html` would return HTML-elements but now we preserve the format from the input using `source_format` argument in the partition call.
### Features
* **Adds element metadata via `category_depth` with default value None**.

View File

@ -0,0 +1,11 @@
Destination Connectors
======================
Connect to your favorite data storage platforms for effortless batch processing of your files.
We are constantly adding new data connectors and if you don't see your favorite platform let us know
in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_invite/zt-20kd2e9ti-q5yz7RCa2nlyqmAba9vqRw>`_
.. toctree::
:maxdepth: 1
destination_connectors/delta_table

View File

@ -0,0 +1,67 @@
Delta Table
==========
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Delta Table.
First you'll need to install the delta table dependencies as shown here.
.. code:: shell
pip install "unstructured[delta-table]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream delta-table connector. This will create a new table on your local and will raise an error if that table already exists.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
delta-table \
--table-uri s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/ \
--output-dir delta-table-example \
--storage_options "AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \
--verbose
delta-table \
--write-column json_data \
--table-uri delta-table-dest
.. tab:: Python
.. code:: python
import subprocess
command = [
"unstructured-ingest",
"delta-table",
"--table-uri", "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/",
"--download-dir", "delta-table-ingest-download",
"--output-dir", "delta-table-example",
"--preserve-downloads",
"--storage_options", "AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY",
"--verbose",
"delta-table"
"--write-column json_data"
"--table-uri delta-table-dest"
]
# Run the command
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()
# Print output
if process.returncode == 0:
print('Command executed successfully. Output:')
print(output.decode())
else:
print('Command failed. Error:')
print(error.decode())
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> delta-table --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -0,0 +1,11 @@
Downstream Connectors
===================
Connect to your favorite data storage platforms for effortless batch processing of your files.
We are constantly adding new data connectors and if you don't see your favorite platform let us know
in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_invite/zt-20kd2e9ti-q5yz7RCa2nlyqmAba9vqRw>`_
.. toctree::
:maxdepth: 1
downstream_connectors/delta_table

View File

@ -17,9 +17,12 @@ Library Documentation
:doc:`bricks`
Learn more about partitioning, cleaning, and staging bricks, including advanced usage patterns.
:doc:`upstream_connectors`
:doc:`source_connectors`
Connect to your favorite data storage platforms for an effortless batch processing of your files.
:doc:`destination_connectors`
Connect to your favorite data storage platforms to write you ingest results to.
:doc:`metadata`
Learn more about how metadata is tracked in the ``unstructured`` library.
@ -43,8 +46,9 @@ Library Documentation
installing
api
bricks
upstream_connectors
source_connectors
destination_connectors
metadata
examples
integrations
best_practices
best_practices

View File

@ -0,0 +1,34 @@
Source Connectors
=================
Connect to your favorite data storage platforms for effortless batch processing of your files.
We are constantly adding new data connectors and if you don'table see your favorite platform let us know
in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_invite/zt-20kd2e9ti-q5yz7RCa2nlyqmAba9vqRw>`_
.. toctree::
:maxdepth: 1
source_connectors/airtable
source_connectors/azure
source_connectors/biomed
source_connectors/box
source_connectors/confluence
source_connectors/delta_table
source_connectors/discord
source_connectors/dropbox
source_connectors/elasticsearch
source_connectors/github
source_connectors/gitlab
source_connectors/google_cloud_storage
source_connectors/google_drive
source_connectors/jira
source_connectors/local_connector
source_connectors/notion
source_connectors/onedrive
source_connectors/outlook
source_connectors/reddit
source_connectors/s3
source_connectors/salesforce
source_connectors/sharepoint
source_connectors/slack
source_connectors/wikipedia

View File

@ -53,6 +53,7 @@ Run Locally
print('Command failed. Error:')
print(error.decode())
Run via the API
---------------

View File

@ -1,34 +0,0 @@
Upstream Connectors
===================
Connect to your favorite data storage platforms for effortless batch processing of your files.
We are constantly adding new data connectors and if you don'table see your favorite platform let us know
in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_invite/zt-20kd2e9ti-q5yz7RCa2nlyqmAba9vqRw>`_
.. toctree::
:maxdepth: 1
upstream_connectors/airtable
upstream_connectors/azure
upstream_connectors/biomed
upstream_connectors/box
upstream_connectors/confluence
upstream_connectors/delta_table
upstream_connectors/discord
upstream_connectors/dropbox
upstream_connectors/elastic_search
upstream_connectors/github
upstream_connectors/gitlab
upstream_connectors/google_cloud_storage
upstream_connectors/google_drive
upstream_connectors/jira
upstream_connectors/local_connector
upstream_connectors/notion
upstream_connectors/onedrive
upstream_connectors/outlook
upstream_connectors/reddit
upstream_connectors/s3
upstream_connectors/salesforce
upstream_connectors/sharepoint
upstream_connectors/slack
upstream_connectors/wikipedia

View File

@ -19,4 +19,7 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--output-dir delta-table-output \
--num-processes 2 \
--storage_options "AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \
--verbose
--verbose \
delta-table \
--write-column json_data \
--table-uri delta-table-dest

View File

@ -0,0 +1,13 @@
from deltalake import DeltaTable
def run_check():
delta_table = DeltaTable(
table_uri="/tmp/delta-table-dest",
)
assert len(delta_table.to_pandas()) == 10
if __name__ == "__main__":
run_check()

View File

@ -7,12 +7,22 @@ cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=delta-table
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
DESTINATION_TABLE=/tmp/delta-table-dest
if [ -z "$AWS_ACCESS_KEY_ID" ] && [ -z "$AWS_SECRET_ACCESS_KEY" ]; then
echo "Skipping Delta Table ingest test because either AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY env var was not set."
exit 0
fi
function cleanup() {
if [ -d "$DESTINATION_TABLE" ]; then
echo "cleaning up tmp directory: $DESTINATION_TABLE"
rm -rf "$DESTINATION_TABLE"
fi
}
trap cleanup EXIT
PYTHONPATH=. ./unstructured/ingest/main.py \
delta-table \
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \
@ -21,6 +31,11 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--output-dir "$OUTPUT_DIR" \
--storage_options "AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \
--preserve-downloads \
--verbose
--verbose \
delta-table \
--write-column json_data \
--table-uri $DESTINATION_TABLE
sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME
python "$SCRIPT_DIR"/python/test-ingest-delta-table-output.py

View File

@ -7,6 +7,7 @@ from .azure import get_source_cmd as azure_src
from .biomed import get_source_cmd as biomed_src
from .box import get_source_cmd as box_src
from .confluence import get_source_cmd as confluence_src
from .delta_table import get_dest_cmd as delta_table_dest
from .delta_table import get_source_cmd as delta_table_src
from .discord import get_source_cmd as discord_src
from .dropbox import get_source_cmd as dropbox_src
@ -57,7 +58,7 @@ src: t.List[click.Group] = [
wikipedia_src(),
]
dest: t.List[click.Command] = [s3_dest()]
dest: t.List[click.Command] = [s3_dest(), delta_table_dest()]
__all__ = [
"src",

View File

@ -78,6 +78,74 @@ def delta_table_source(ctx: click.Context, **options):
raise click.ClickException(str(e)) from e
@dataclass
class DeltaTableCliWriteConfig(BaseConfig, CliMixin):
write_column: str
mode: t.Literal["error", "append", "overwrite", "ignore"] = "error"
@staticmethod
def add_cli_options(cmd: click.Command) -> None:
options = [
click.Option(
["--write-column"],
required=True,
type=str,
help="column in delta table to write json content",
),
click.Option(
["--mode"],
default="error",
type=click.Choice(["error", "append", "overwrite", "ignore"]),
help="How to handle existing data. Default is to error if table already exists. "
"If 'append', will add new data. "
"If 'overwrite', will replace table with new data. "
"If 'ignore', will not write anything if table already exists.",
),
]
cmd.params.extend(options)
@click.command(name="delta-table")
@click.pass_context
def delta_table_dest(ctx: click.Context, **options):
parent_options: dict = ctx.parent.params if ctx.parent else {}
# Click sets all multiple fields as tuple, this needs to be updated to list
for k, v in options.items():
if isinstance(v, tuple):
options[k] = list(v)
for k, v in parent_options.items():
if isinstance(v, tuple):
parent_options[k] = list(v)
verbose = parent_options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
# run_init_checks(**options)
read_config = CliReadConfig.from_dict(parent_options)
partition_config = CliPartitionConfig.from_dict(parent_options)
# Run for schema validation
DeltaTableCliConfig.from_dict(options)
DeltaTableCliWriteConfig.from_dict(options)
delta_table_fn(
read_config=read_config,
partition_config=partition_config,
writer_type="delta_table",
writer_kwargs=options,
**parent_options,
)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e
def get_dest_cmd() -> click.Command:
cmd = delta_table_dest
DeltaTableCliConfig.add_cli_options(cmd)
DeltaTableCliWriteConfig.add_cli_options(cmd)
return cmd
def get_source_cmd() -> click.Group:
cmd = delta_table_source
DeltaTableCliConfig.add_cli_options(cmd)

View File

@ -1,3 +1,4 @@
import json
import os
import typing as t
from dataclasses import dataclass
@ -9,10 +10,12 @@ import pandas as pd
from unstructured.ingest.error import SourceConnectionError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
BaseSourceConnector,
IngestDocCleanupMixin,
SourceConnectorCleanupMixin,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
@ -141,3 +144,41 @@ class DeltaTableSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector
)
for uri in self.delta_table.file_uris()
]
@dataclass
class DeltaTableWriteConfig(WriteConfig):
write_column: str
mode: t.Literal["error", "append", "overwrite", "ignore"] = "error"
@dataclass
class DeltaTableDestinationConnector(BaseDestinationConnector):
write_config: DeltaTableWriteConfig
connector_config: SimpleDeltaTableConfig
@requires_dependencies(["deltalake"], extras="delta-table")
def initialize(self):
pass
@requires_dependencies(["deltalake"], extras="delta-table")
def write(self, docs: t.List[BaseIngestDoc]) -> None:
from deltalake.writer import write_deltalake
json_list = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
json_content = json.load(json_file)
json_items = [json.dumps(j) for j in json_content]
logger.info(f"converting {len(json_items)} rows from content in {local_path}")
json_list.extend(json_items)
logger.info(
f"writing {len(json_list)} rows to destination "
f"table at {self.connector_config.table_uri}",
)
write_deltalake(
table_or_uri=self.connector_config.table_uri,
data=pd.DataFrame(data={self.write_config.write_column: json_list}),
mode=self.write_config.mode,
)

View File

@ -1,6 +1,11 @@
import typing as t
from pathlib import Path
from unstructured.ingest.interfaces import WriteConfig
from unstructured.utils import requires_dependencies
@requires_dependencies(["s3fs", "fsspec"], extras="s3")
def s3_writer(
remote_url: str,
anonymous: bool,
@ -20,6 +25,30 @@ def s3_writer(
)
writer_map = {
@requires_dependencies(["deltalake"], extras="delta-table")
def delta_table_writer(
table_uri: t.Union[str, Path],
write_column: str,
mode: t.Literal["error", "append", "overwrite", "ignore"] = "error",
verbose: bool = False,
**kwargs,
):
from unstructured.ingest.connector.delta_table import (
DeltaTableDestinationConnector,
DeltaTableWriteConfig,
SimpleDeltaTableConfig,
)
return DeltaTableDestinationConnector(
write_config=DeltaTableWriteConfig(write_column=write_column, mode=mode),
connector_config=SimpleDeltaTableConfig(
table_uri=table_uri,
verbose=verbose,
),
)
writer_map: t.Dict[str, t.Callable] = {
"s3": s3_writer,
"delta_table": delta_table_writer,
}