mirror of
https://github.com/allenai/olmocr.git
synced 2025-10-14 01:32:31 +00:00
Merge branch 'main' into jakep/new_data
This commit is contained in:
commit
1be5cea567
@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## Unreleased
|
||||
|
||||
## [v0.3.3](https://github.com/allenai/olmocr/releases/tag/v0.3.3) - 2025-08-15
|
||||
|
||||
## [v0.3.2](https://github.com/allenai/olmocr/releases/tag/v0.3.2) - 2025-08-14
|
||||
|
||||
## [v0.3.1](https://github.com/allenai/olmocr/releases/tag/v0.3.1) - 2025-08-14
|
||||
|
||||
## [v0.3.0](https://github.com/allenai/olmocr/releases/tag/v0.3.0) - 2025-08-13
|
||||
|
||||
## [v0.2.3](https://github.com/allenai/olmocr/releases/tag/v0.2.3) - 2025-08-04
|
||||
|
3386
dataloader.txt
3386
dataloader.txt
File diff suppressed because one or more lines are too long
@ -14,7 +14,7 @@ import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from concurrent.futures.process import BrokenProcessPool
|
||||
from dataclasses import dataclass
|
||||
from functools import cache
|
||||
@ -84,8 +84,7 @@ pdf_s3 = boto3.client("s3")
|
||||
metrics = MetricsKeeper(window=60 * 5)
|
||||
tracker = WorkerTracker()
|
||||
|
||||
# Process pool for offloading cpu bound work, like calculating anchor texts, max 32 workers, otherwise it can spawn way too many workers on a big machine
|
||||
process_pool = ProcessPoolExecutor(max_workers=min(multiprocessing.cpu_count() // 2 + 1, 32), mp_context=multiprocessing.get_context("spawn"))
|
||||
pdf_render_max_workers = asyncio.BoundedSemaphore(int(float(os.environ.get("BEAKER_ASSIGNED_CPU_COUNT", max(1, multiprocessing.cpu_count() - 2)))))
|
||||
|
||||
# Filter object, cached so it will only get loaded when/if you need it
|
||||
get_pdf_filter = cache(lambda: PdfFilter(languages_to_keep={Language.ENGLISH, None}, apply_download_spam_check=True, apply_form_check=True))
|
||||
@ -109,8 +108,9 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_
|
||||
MAX_TOKENS = 4500
|
||||
assert image_rotation in [0, 90, 180, 270], "Invalid image rotation provided in build_page_query"
|
||||
|
||||
# Allow the page rendering to process in the background while we get the anchor text (which blocks the main thread)
|
||||
image_base64 = await asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim)
|
||||
# Allow the page rendering to process in the background, but limit the number of workers otherwise you can overload the system
|
||||
async with pdf_render_max_workers:
|
||||
image_base64 = await asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim)
|
||||
|
||||
if image_rotation != 0:
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
@ -611,6 +611,8 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
# OMP_NUM_THREADS needs to be 1, otherwise you could have contention if you are running multiple copies of olmOCR on a machine with several GPUS
|
||||
env={**os.environ, "OMP_NUM_THREADS": "1"},
|
||||
)
|
||||
|
||||
# Ensure the subprocess is terminated on exit
|
||||
@ -623,13 +625,13 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
atexit.register(_kill_proc)
|
||||
|
||||
# Shared variables between tasks
|
||||
last_running_req, last_queue_req = 0, 0
|
||||
last_running_req, peak_running_req, last_queue_req = 0, 0, 0
|
||||
running_reqs_decreased = False
|
||||
server_printed_ready_message = False
|
||||
last_semaphore_release = time.time()
|
||||
|
||||
async def process_line(line):
|
||||
nonlocal last_running_req, last_queue_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message
|
||||
nonlocal last_running_req, last_queue_req, peak_running_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message
|
||||
server_logger.info(line)
|
||||
|
||||
if "Detected errors during sampling" in line:
|
||||
@ -642,8 +644,12 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
|
||||
if match := re.search(r"Running: (\d+)", line):
|
||||
current_running = int(match.group(1))
|
||||
# Check for negative derivative (decrease in running requests), to not overload VLLM
|
||||
if current_running < last_running_req:
|
||||
# Track peak running requests
|
||||
if current_running > peak_running_req:
|
||||
peak_running_req = current_running
|
||||
logger.info(f"New peak running requests: {peak_running_req}")
|
||||
# Check for negative derivative (decrease in running requests), to not overload VLLM in times of high contention
|
||||
if current_running < last_running_req and not running_reqs_decreased:
|
||||
running_reqs_decreased = True
|
||||
logger.info(f"Running requests decreased: {last_running_req} -> {current_running}")
|
||||
last_running_req = current_running
|
||||
@ -664,7 +670,7 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
logger.warning(f"Got {ex} when reading log line from inference server, skipping")
|
||||
|
||||
async def timeout_task():
|
||||
nonlocal last_running_req, last_queue_req, last_semaphore_release, running_reqs_decreased
|
||||
nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
@ -672,7 +678,7 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
# Check if we should release the semaphore
|
||||
should_release = (
|
||||
server_printed_ready_message
|
||||
and last_queue_req == 0
|
||||
and last_queue_req <= int(peak_running_req * 0.1)
|
||||
and time.time() - last_semaphore_release > 30
|
||||
and semaphore.locked()
|
||||
and (last_running_req == 0 or running_reqs_decreased)
|
||||
@ -682,7 +688,7 @@ async def vllm_server_task(model_name_or_path, args, semaphore, unknown_args=Non
|
||||
semaphore.release()
|
||||
running_reqs_decreased = False # Reset flag after release
|
||||
last_semaphore_release = time.time()
|
||||
logger.info(f"Semaphore released, allowing a worker to proceed. Running requests: {last_running_req}")
|
||||
logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})")
|
||||
except asyncio.CancelledError:
|
||||
pass # Clean up if the task is cancelled
|
||||
|
||||
@ -1235,9 +1241,6 @@ async def main():
|
||||
# Wait for all worker tasks to finish
|
||||
await asyncio.gather(*worker_tasks)
|
||||
|
||||
# Wait for server to stop
|
||||
process_pool.shutdown(wait=False)
|
||||
|
||||
vllm_server.cancel()
|
||||
metrics_task.cancel()
|
||||
|
||||
|
@ -2,7 +2,7 @@ _MAJOR = "0"
|
||||
_MINOR = "3"
|
||||
# On main and in a nightly release the patch should be one ahead of the last
|
||||
# released build.
|
||||
_PATCH = "0"
|
||||
_PATCH = "3"
|
||||
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
|
||||
# https://semver.org/#is-v123-a-semantic-version for the semantics.
|
||||
_SUFFIX = ""
|
||||
|
29234
output.txt
29234
output.txt
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user