diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index 3b52ae4..ed33620 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -26,6 +26,7 @@ from functools import partial, cache from dataclasses import dataclass from typing import Optional, Tuple, List, Dict, Set from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from concurrent.futures.process import BrokenProcessPool from pdelfin.s3_queue import S3WorkQueue, WorkItem from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory @@ -265,6 +266,13 @@ async def process_pdf(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_p return build_dolma_document(pdf_s3_path, page_results) except Exception as e: + # Check for ExceptionGroup with BrokenProcessPool + if isinstance(e, ExceptionGroup): + broken_pool, other = e.split(BrokenProcessPool) + if broken_pool is not None: # Found at least one BrokenProcessPool + logger.critical("Encountered BrokenProcessPool, exiting process.") + sys.exit(1) + logger.exception(f"Exception in process_pdf for {pdf_s3_path}: {e}") # You can't build a dolma doc with even 1 failed page, so just get out of here # However, you don't want to propagate an exception higher up and cancel the entire work_group diff --git a/pdelfin/version.py b/pdelfin/version.py index 58ee23d..9d24ad4 100644 --- a/pdelfin/version.py +++ b/pdelfin/version.py @@ -2,7 +2,7 @@ _MAJOR = "0" _MINOR = "1" # On main and in a nightly release the patch should be one ahead of the last # released build. -_PATCH = "44" +_PATCH = "45" # This is mainly for nightly builds which have the suffix ".dev$DATE". See # https://semver.org/#is-v123-a-semantic-version for the semantics. _SUFFIX = ""