last threading fix

This commit is contained in:
Jake Poznanski 2025-09-19 17:23:12 +00:00
parent 0a74746da9
commit b1242db8e2
2 changed files with 12 additions and 10 deletions

View File

@ -55,15 +55,18 @@ async def run_sync_in_executor(func, executor, *args, **kwargs):
return await loop.run_in_executor(executor, partial(func, *args, **kwargs))
async def process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor=None):
async def process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor=None, use_executor=True):
"""Process a single PDF and save the result to output_path"""
try:
if is_async:
# Run async function directly
markdown = await method(pdf_path, page_num=page_num, **kwargs)
else:
elif use_executor:
# Run synchronous function in the executor
markdown = await run_sync_in_executor(method, executor, pdf_path, page_num=page_num, **kwargs)
else:
# Run synchronous function directly without executor (when parallel=0)
markdown = method(pdf_path, page_num=page_num, **kwargs)
if markdown is None:
print(f"Warning, did not get output for {os.path.basename(output_path)}")
@ -90,9 +93,10 @@ async def process_pdfs(config, pdf_directory, data_directory, repeats, remove_te
Process PDFs using asyncio for both sync and async methods,
limiting the number of concurrent tasks to max_parallel.
"""
# Create a thread pool executor with limited threads when max_parallel is 1
# This prevents unwanted multithreading for synchronous methods
executor = ThreadPoolExecutor(max_workers=max_parallel or 1) if max_parallel else None
# When max_parallel is 0, run synchronously without any executor
# When max_parallel is > 0, create a thread pool executor
use_executor = max_parallel != 0
executor = ThreadPoolExecutor(max_workers=max_parallel) if use_executor else None
try:
for candidate in config.keys():
@ -169,12 +173,13 @@ async def process_pdfs(config, pdf_directory, data_directory, repeats, remove_te
print("Rerun with --force flag to force regeneration")
continue
task = process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor)
task = process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor, use_executor)
tasks.append(task)
task_descriptions[id(task)] = f"{base_name}_pg{page_num}_repeat{repeat} ({candidate})"
# Process tasks with semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_parallel or 1) # Default to 1 if not specified
# When max_parallel is 0, set semaphore to 1 to run sequentially
semaphore = asyncio.Semaphore(max_parallel if max_parallel else 1)
async def process_with_semaphore(task):
async with semaphore:

View File

@ -33,7 +33,4 @@ def run_paddlepaddle(
if cur_page_0_indexed == page_num - 1:
result = res.markdown["markdown_texts"]
# Fixing annoying crashes
torch.cuda.empty_cache()
return result