mirror of
https://github.com/allenai/olmocr.git
synced 2025-08-19 06:12:23 +00:00
Cleanup code, s3 retries
This commit is contained in:
parent
2c7686f8ff
commit
fe0574c725
@ -26,7 +26,7 @@ from dataclasses import dataclass
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
|
|
||||||
from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
|
from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
|
||||||
from pdelfin.data.renderpdf import render_pdf_to_base64png
|
from pdelfin.data.renderpdf import render_pdf_to_base64png
|
||||||
from pdelfin.prompts import build_finetuning_prompt, PageResponse
|
from pdelfin.prompts import build_finetuning_prompt, PageResponse
|
||||||
from pdelfin.prompts.anchor import get_anchor_text
|
from pdelfin.prompts.anchor import get_anchor_text
|
||||||
@ -69,7 +69,7 @@ tracker = WorkerTracker()
|
|||||||
# Process pool for offloading cpu bound work, like calculating anchor texts
|
# Process pool for offloading cpu bound work, like calculating anchor texts
|
||||||
process_pool = ProcessPoolExecutor()
|
process_pool = ProcessPoolExecutor()
|
||||||
|
|
||||||
SGLANG_SERVER_PORT = 30002
|
SGLANG_SERVER_PORT = 30003
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class PageResult:
|
class PageResult:
|
||||||
@ -321,27 +321,26 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf
|
|||||||
exponential_backoffs += 1
|
exponential_backoffs += 1
|
||||||
logger.info(f"Sleeping for {sleep_delay} seconds on {pdf_s3_path}-{page_num} to allow server restart")
|
logger.info(f"Sleeping for {sleep_delay} seconds on {pdf_s3_path}-{page_num} to allow server restart")
|
||||||
await asyncio.sleep(sleep_delay)
|
await asyncio.sleep(sleep_delay)
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
|
|
||||||
attempt += 1
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info(f"Process page {pdf_s3_path}-{page_num} cancelled")
|
logger.info(f"Process page {pdf_s3_path}-{page_num} cancelled")
|
||||||
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "cancelled")
|
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "cancelled")
|
||||||
raise
|
raise
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
|
||||||
|
attempt += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
|
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
|
||||||
attempt += 1
|
attempt += 1
|
||||||
|
|
||||||
if attempt >= MAX_RETRIES:
|
logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
|
||||||
logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
|
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored")
|
||||||
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored")
|
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")
|
||||||
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")
|
|
||||||
|
|
||||||
|
|
||||||
async def process_pdf(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str):
|
async def process_pdf(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str):
|
||||||
with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf:
|
with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf:
|
||||||
# TODO Switch to aioboto3 or something
|
# TODO Switch to aioboto3 or something
|
||||||
data = await asyncio.to_thread(lambda: get_s3_bytes(pdf_s3, pdf_s3_path))
|
data = await asyncio.to_thread(lambda: get_s3_bytes_with_backoff(pdf_s3, pdf_s3_path))
|
||||||
tf.write(data)
|
tf.write(data)
|
||||||
tf.flush()
|
tf.flush()
|
||||||
|
|
||||||
|
@ -79,6 +79,18 @@ def get_s3_bytes(s3_client, s3_path: str, start_index: Optional[int] = None, end
|
|||||||
|
|
||||||
return obj['Body'].read()
|
return obj['Body'].read()
|
||||||
|
|
||||||
|
def get_s3_bytes_with_backoff(s3_client, pdf_s3_path, max_retries: int=8, backoff_factor: int=2):
|
||||||
|
attempt = 0
|
||||||
|
while attempt < max_retries:
|
||||||
|
try:
|
||||||
|
return get_s3_bytes(s3_client, pdf_s3_path)
|
||||||
|
except Exception as e:
|
||||||
|
wait_time = backoff_factor ** attempt
|
||||||
|
logger.warning(f"Attempt {attempt+1} failed to get_s3_bytes for {pdf_s3_path}: {e}. Retrying in {wait_time} seconds...")
|
||||||
|
time.sleep(wait_time)
|
||||||
|
attempt += 1
|
||||||
|
logger.error(f"Failed to get_s3_bytes for {pdf_s3_path} after {max_retries} retries.")
|
||||||
|
raise Exception("Failed to get_s3_bytes after retries")
|
||||||
|
|
||||||
def put_s3_bytes(s3_client, s3_path: str, data: bytes):
|
def put_s3_bytes(s3_client, s3_path: str, data: bytes):
|
||||||
bucket, key = parse_s3_path(s3_path)
|
bucket, key = parse_s3_path(s3_path)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user