diff --git a/olmocr/pipeline.py b/olmocr/pipeline.py index e4d5d14..f0bf6ee 100644 --- a/olmocr/pipeline.py +++ b/olmocr/pipeline.py @@ -84,8 +84,7 @@ pdf_s3 = boto3.client("s3") metrics = MetricsKeeper(window=60 * 5) tracker = WorkerTracker() -# Process pool for offloading cpu bound work, like calculating anchor texts, max 32 workers, otherwise it can spawn way too many workers on a big machine -process_pool = ProcessPoolExecutor(max_workers=min(multiprocessing.cpu_count() // 2 + 1, 32), mp_context=multiprocessing.get_context("spawn")) +pdf_render_max_workers = asyncio.BoundedSemaphore(int(os.environ.get("BEAKER_ASSIGNED_CPU_COUNT", max(1, multiprocessing.cpu_count() - 2)))) # Filter object, cached so it will only get loaded when/if you need it get_pdf_filter = cache(lambda: PdfFilter(languages_to_keep={Language.ENGLISH, None}, apply_download_spam_check=True, apply_form_check=True)) @@ -109,8 +108,9 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_ MAX_TOKENS = 4500 assert image_rotation in [0, 90, 180, 270], "Invalid image rotation provided in build_page_query" - # Allow the page rendering to process in the background while we get the anchor text (which blocks the main thread) - image_base64 = await asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim) + # Allow the page rendering to process in the background, but limit the number of workers otherwise you can overload the system + async with pdf_render_max_workers: + image_base64 = await asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim) if image_rotation != 0: image_bytes = base64.b64decode(image_base64) @@ -1235,9 +1235,6 @@ async def main(): # Wait for all worker tasks to finish await asyncio.gather(*worker_tasks) - # Wait for server to stop - process_pool.shutdown(wait=False) - vllm_server.cancel() metrics_task.cancel()