diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index 5eda7ea..91887fd 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -26,7 +26,7 @@ from dataclasses import dataclass from typing import Optional from concurrent.futures import ProcessPoolExecutor -from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory +from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory from pdelfin.data.renderpdf import render_pdf_to_base64png from pdelfin.prompts import build_finetuning_prompt, PageResponse from pdelfin.prompts.anchor import get_anchor_text @@ -69,7 +69,7 @@ tracker = WorkerTracker() # Process pool for offloading cpu bound work, like calculating anchor texts process_pool = ProcessPoolExecutor() -SGLANG_SERVER_PORT = 30002 +SGLANG_SERVER_PORT = 30003 @dataclass(frozen=True) class PageResult: @@ -321,27 +321,26 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf exponential_backoffs += 1 logger.info(f"Sleeping for {sleep_delay} seconds on {pdf_s3_path}-{page_num} to allow server restart") await asyncio.sleep(sleep_delay) - except json.JSONDecodeError as e: - logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}") - attempt += 1 except asyncio.CancelledError: logger.info(f"Process page {pdf_s3_path}-{page_num} cancelled") await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "cancelled") raise + except json.JSONDecodeError as e: + logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}") + attempt += 1 except Exception as e: logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}") attempt += 1 - if attempt >= MAX_RETRIES: - logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.") - await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored") - raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts") + logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.") + await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored") + raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts") async def process_pdf(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str): with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf: # TODO Switch to aioboto3 or something - data = await asyncio.to_thread(lambda: get_s3_bytes(pdf_s3, pdf_s3_path)) + data = await asyncio.to_thread(lambda: get_s3_bytes_with_backoff(pdf_s3, pdf_s3_path)) tf.write(data) tf.flush() diff --git a/pdelfin/s3_utils.py b/pdelfin/s3_utils.py index 431a0bc..c02a548 100644 --- a/pdelfin/s3_utils.py +++ b/pdelfin/s3_utils.py @@ -79,6 +79,18 @@ def get_s3_bytes(s3_client, s3_path: str, start_index: Optional[int] = None, end return obj['Body'].read() +def get_s3_bytes_with_backoff(s3_client, pdf_s3_path, max_retries: int=8, backoff_factor: int=2): + attempt = 0 + while attempt < max_retries: + try: + return get_s3_bytes(s3_client, pdf_s3_path) + except Exception as e: + wait_time = backoff_factor ** attempt + logger.warning(f"Attempt {attempt+1} failed to get_s3_bytes for {pdf_s3_path}: {e}. Retrying in {wait_time} seconds...") + time.sleep(wait_time) + attempt += 1 + logger.error(f"Failed to get_s3_bytes for {pdf_s3_path} after {max_retries} retries.") + raise Exception("Failed to get_s3_bytes after retries") def put_s3_bytes(s3_client, s3_path: str, data: bytes): bucket, key = parse_s3_path(s3_path)