Error out if you see a broken process pool, might need a better check for this

This commit is contained in:
Jake Poznanski 2024-11-22 22:07:43 +00:00
parent a95487e44c
commit 41accfe867
2 changed files with 9 additions and 1 deletions

View File

@ -26,6 +26,7 @@ from functools import partial, cache
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict, Set from typing import Optional, Tuple, List, Dict, Set
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from concurrent.futures.process import BrokenProcessPool
from pdelfin.s3_queue import S3WorkQueue, WorkItem from pdelfin.s3_queue import S3WorkQueue, WorkItem
from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
@ -265,6 +266,13 @@ async def process_pdf(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_p
return build_dolma_document(pdf_s3_path, page_results) return build_dolma_document(pdf_s3_path, page_results)
except Exception as e: except Exception as e:
# Check for ExceptionGroup with BrokenProcessPool
if isinstance(e, ExceptionGroup):
broken_pool, other = e.split(BrokenProcessPool)
if broken_pool is not None: # Found at least one BrokenProcessPool
logger.critical("Encountered BrokenProcessPool, exiting process.")
sys.exit(1)
logger.exception(f"Exception in process_pdf for {pdf_s3_path}: {e}") logger.exception(f"Exception in process_pdf for {pdf_s3_path}: {e}")
# You can't build a dolma doc with even 1 failed page, so just get out of here # You can't build a dolma doc with even 1 failed page, so just get out of here
# However, you don't want to propagate an exception higher up and cancel the entire work_group # However, you don't want to propagate an exception higher up and cancel the entire work_group

View File

@ -2,7 +2,7 @@ _MAJOR = "0"
_MINOR = "1" _MINOR = "1"
# On main and in a nightly release the patch should be one ahead of the last # On main and in a nightly release the patch should be one ahead of the last
# released build. # released build.
_PATCH = "44" _PATCH = "45"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See # This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics. # https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = "" _SUFFIX = ""