From cd8c6a2e0941af09426de749c0720a6e1af1e2e3 Mon Sep 17 00:00:00 2001 From: Yao You Date: Thu, 28 Sep 2023 21:41:18 -0500 Subject: [PATCH] fix: occasional SIGABRT with deltalake writer on Linux (#1567) - resolves an issue where occasionally deltalake writer results in SIGABRT event though the writer finished writing table properly on linux - this is first observed in ingest test - Putting the writer into a process mitigates this problem by forcing python to finish the deltalake rust backend to finish its tasks ## test To test this it is best to setup an instance on a Linux system since the problem has only been observed on Linux so far. Run ```bash PYTHONPATH=. ./unstructured/ingest/main.py delta-table --num-processes 2 --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.date_created,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth --table-uri ../tables/delta/ --preserve-downloads --verbose delta-table --write-column json_data --mode overwrite --table-uri file:///tmp/delta ``` Without this fix occasionally we'd encounter `SIGABTR`. --------- Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com> --- CHANGELOG.md | 1 + test_unstructured_ingest/test-ingest.sh | 2 +- unstructured/ingest/connector/delta_table.py | 18 ++++++++++++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 300c3e230..fe3c444ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ allowing the document to be loaded. Fix: Change parent class for Formula to Text * **Fixes Sphinx errors.** Fixes errors when running Sphinx `make html` and installs library to suppress warnings. * **Fixes a metadata backwards compatibility error** Problem: When calling `partition_via_api`, the hosted api may return an element schema that's newer than the current `unstructured`. In this case, metadata fields were added which did not exist in the local `ElementMetadata` dataclass, and `__init__()` threw an error. Fix: remove nonexistent fields before instantiating in `ElementMetadata.from_json()`. Importance: Crucial to avoid breaking changes when adding fields. * **Fixes issue with Discord connector when a channel returns `None`** Problem: Getting the `jump_url` from a nonexistent Discord `channel` fails. Fix: property `jump_url` is now retrieved within the same context as the messages from the channel. Importance: Avoids cascading issues when the connector fails to fetch information about a Discord channel. +* **Fixes occasionally SIGABTR when writing table with `deltalake` on Linux** Problem: occasionally on Linux ingest can throw a `SIGABTR` when writing `deltalake` table even though the table was written correctly. Fix: put the writing function into a `Process` to ensure its execution to the fullest extent before returning to the main process. Importance: Improves stability of connectors using `deltalake` ## 0.10.16 diff --git a/test_unstructured_ingest/test-ingest.sh b/test_unstructured_ingest/test-ingest.sh index 97a5917e5..926821943 100755 --- a/test_unstructured_ingest/test-ingest.sh +++ b/test_unstructured_ingest/test-ingest.sh @@ -56,7 +56,7 @@ trap print_last_run EXIT for script in "${scripts[@]}"; do CURRENT_SCRIPT=$script - if [[ "$CURRENT_SCRIPT" == "test-ingest-notion.sh" ]] || [[ "$CURRENT_SCRIPT" == "test-ingest-delta-table.sh" ]]; then + if [[ "$CURRENT_SCRIPT" == "test-ingest-notion.sh" ]]; then echo "--------- RUNNING SCRIPT $script --- IGNORING FAILURES" set +e echo "Running ./test_unstructured_ingest/$script" diff --git a/unstructured/ingest/connector/delta_table.py b/unstructured/ingest/connector/delta_table.py index f471b2b06..976e5fbdd 100644 --- a/unstructured/ingest/connector/delta_table.py +++ b/unstructured/ingest/connector/delta_table.py @@ -3,6 +3,7 @@ import os import typing as t from dataclasses import dataclass from datetime import datetime as dt +from multiprocessing import Process from pathlib import Path import pandas as pd @@ -182,8 +183,17 @@ class DeltaTableDestinationConnector(BaseDestinationConnector): f"writing {len(json_list)} rows to destination " f"table at {self.connector_config.table_uri}", ) - write_deltalake( - table_or_uri=self.connector_config.table_uri, - data=pd.DataFrame(data={self.write_config.write_column: json_list}), - mode=self.write_config.mode, + # NOTE: deltalake writer on Linux sometimes can finish but still trigger a SIGABRT and cause + # ingest to fail, even though all tasks are completed normally. Putting the writer into a + # process mitigates this issue by ensuring python interpreter waits properly for deltalake's + # rust backend to finish + writer = Process( + target=write_deltalake, + kwargs={ + "table_or_uri": self.connector_config.table_uri, + "data": pd.DataFrame(data={self.write_config.write_column: json_list}), + "mode": self.write_config.mode, + }, ) + writer.start() + writer.join()