mirror of
https://github.com/allenai/olmocr.git
synced 2025-11-02 02:54:53 +00:00
Fixing non async threading
This commit is contained in:
parent
e607b53748
commit
b52ac23073
@ -5,6 +5,7 @@ import glob
|
||||
import importlib
|
||||
import os
|
||||
import tempfile
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import partial
|
||||
|
||||
from pypdf import PdfReader
|
||||
@ -48,13 +49,13 @@ def parse_method_arg(method_arg):
|
||||
|
||||
|
||||
# Wrapper to run synchronous functions in the event loop
|
||||
async def run_sync_in_executor(func, *args, **kwargs):
|
||||
"""Run a synchronous function in the default executor"""
|
||||
async def run_sync_in_executor(func, executor, *args, **kwargs):
|
||||
"""Run a synchronous function in the provided executor"""
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(None, partial(func, *args, **kwargs))
|
||||
return await loop.run_in_executor(executor, partial(func, *args, **kwargs))
|
||||
|
||||
|
||||
async def process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async):
|
||||
async def process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor=None):
|
||||
"""Process a single PDF and save the result to output_path"""
|
||||
try:
|
||||
if is_async:
|
||||
@ -62,7 +63,7 @@ async def process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async)
|
||||
markdown = await method(pdf_path, page_num=page_num, **kwargs)
|
||||
else:
|
||||
# Run synchronous function in the executor
|
||||
markdown = await run_sync_in_executor(method, pdf_path, page_num=page_num, **kwargs)
|
||||
markdown = await run_sync_in_executor(method, executor, pdf_path, page_num=page_num, **kwargs)
|
||||
|
||||
if markdown is None:
|
||||
print(f"Warning, did not get output for {os.path.basename(output_path)}")
|
||||
@ -89,109 +90,118 @@ async def process_pdfs(config, pdf_directory, data_directory, repeats, remove_te
|
||||
Process PDFs using asyncio for both sync and async methods,
|
||||
limiting the number of concurrent tasks to max_parallel.
|
||||
"""
|
||||
for candidate in config.keys():
|
||||
print(f"Starting conversion using {candidate} with kwargs: {config[candidate]['kwargs']}")
|
||||
folder_name = config[candidate]["folder_name"]
|
||||
candidate_output_dir = os.path.join(data_directory, folder_name)
|
||||
os.makedirs(candidate_output_dir, exist_ok=True)
|
||||
# Create a thread pool executor with limited threads when max_parallel is 1
|
||||
# This prevents unwanted multithreading for synchronous methods
|
||||
executor = ThreadPoolExecutor(max_workers=max_parallel or 1) if max_parallel else None
|
||||
|
||||
try:
|
||||
for candidate in config.keys():
|
||||
print(f"Starting conversion using {candidate} with kwargs: {config[candidate]['kwargs']}")
|
||||
folder_name = config[candidate]["folder_name"]
|
||||
candidate_output_dir = os.path.join(data_directory, folder_name)
|
||||
os.makedirs(candidate_output_dir, exist_ok=True)
|
||||
|
||||
method = config[candidate]["method"]
|
||||
kwargs = config[candidate]["kwargs"]
|
||||
is_async = asyncio.iscoroutinefunction(method)
|
||||
method = config[candidate]["method"]
|
||||
kwargs = config[candidate]["kwargs"]
|
||||
is_async = asyncio.iscoroutinefunction(method)
|
||||
|
||||
# Use recursive glob to support nested PDFs
|
||||
all_pdfs = glob.glob(os.path.join(pdf_directory, "**/*.pdf"), recursive=True)
|
||||
all_pdfs.sort()
|
||||
# Use recursive glob to support nested PDFs
|
||||
all_pdfs = glob.glob(os.path.join(pdf_directory, "**/*.pdf"), recursive=True)
|
||||
all_pdfs.sort()
|
||||
|
||||
# Prepare all tasks
|
||||
tasks = []
|
||||
task_descriptions = {}
|
||||
# Prepare all tasks
|
||||
tasks = []
|
||||
task_descriptions = {}
|
||||
|
||||
for pdf_path in all_pdfs:
|
||||
pdf = PdfReader(pdf_path)
|
||||
num_pages = len(pdf.pages)
|
||||
base_name = os.path.basename(pdf_path).replace(".pdf", "")
|
||||
# Determine the PDF's relative folder path (e.g. "arxiv_data") relative to pdf_directory
|
||||
relative_pdf_path = os.path.relpath(pdf_path, pdf_directory)
|
||||
pdf_relative_dir = os.path.dirname(relative_pdf_path)
|
||||
for pdf_path in all_pdfs:
|
||||
pdf = PdfReader(pdf_path)
|
||||
num_pages = len(pdf.pages)
|
||||
base_name = os.path.basename(pdf_path).replace(".pdf", "")
|
||||
# Determine the PDF's relative folder path (e.g. "arxiv_data") relative to pdf_directory
|
||||
relative_pdf_path = os.path.relpath(pdf_path, pdf_directory)
|
||||
pdf_relative_dir = os.path.dirname(relative_pdf_path)
|
||||
|
||||
if remove_text:
|
||||
print(f"Converting {pdf_path} into images to remove text-content...")
|
||||
if remove_text:
|
||||
print(f"Converting {pdf_path} into images to remove text-content...")
|
||||
|
||||
# Generate image files from each page
|
||||
temp_image_files = []
|
||||
try:
|
||||
for page_num in range(1, num_pages + 1):
|
||||
# Get base64 PNG data for the current page
|
||||
base64_png = render_pdf_to_base64png(pdf_path, page_num, target_longest_image_dim=2048)
|
||||
|
||||
# Decode base64 and save to temporary file
|
||||
temp_img = tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False)
|
||||
temp_img.write(base64.b64decode(base64_png))
|
||||
temp_img.close()
|
||||
temp_image_files.append(temp_img.name)
|
||||
|
||||
# Convert all images to a single PDF using our enhanced function
|
||||
pdf_bytes = convert_image_to_pdf_bytes(temp_image_files)
|
||||
|
||||
# Write the PDF bytes to a temporary file
|
||||
temp_pdf = tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False)
|
||||
temp_pdf.write(pdf_bytes)
|
||||
temp_pdf.close()
|
||||
|
||||
# Update pdf_path to the new file
|
||||
pdf_path = temp_pdf.name
|
||||
|
||||
finally:
|
||||
# Clean up temporary image files
|
||||
for temp_file in temp_image_files:
|
||||
try:
|
||||
os.remove(temp_file)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to remove temporary file {temp_file}: {e}")
|
||||
|
||||
for repeat in range(1, repeats + 1):
|
||||
for page_num in range(1, num_pages + 1):
|
||||
output_filename = f"{base_name}_pg{page_num}_repeat{repeat}.md"
|
||||
# Preserve the relative folder structure in the output directory
|
||||
candidate_pdf_dir = os.path.join(candidate_output_dir, pdf_relative_dir)
|
||||
os.makedirs(candidate_pdf_dir, exist_ok=True)
|
||||
output_path = os.path.join(candidate_pdf_dir, output_filename)
|
||||
|
||||
if os.path.exists(output_path) and not force:
|
||||
print(f"Skipping {base_name}_pg{page_num}_repeat{repeat} for {candidate}, file already exists")
|
||||
print("Rerun with --force flag to force regeneration")
|
||||
continue
|
||||
|
||||
task = process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async)
|
||||
tasks.append(task)
|
||||
task_descriptions[id(task)] = f"{base_name}_pg{page_num}_repeat{repeat} ({candidate})"
|
||||
|
||||
# Process tasks with semaphore to limit concurrency
|
||||
semaphore = asyncio.Semaphore(max_parallel or 1) # Default to 1 if not specified
|
||||
|
||||
async def process_with_semaphore(task):
|
||||
async with semaphore:
|
||||
return await task
|
||||
|
||||
# Wrap each task with the semaphore
|
||||
limited_tasks = [process_with_semaphore(task) for task in tasks]
|
||||
|
||||
# Process tasks with progress bar
|
||||
if limited_tasks:
|
||||
completed = 0
|
||||
with tqdm(total=len(limited_tasks), desc=f"Processing {candidate}") as pbar:
|
||||
for task in asyncio.as_completed(limited_tasks):
|
||||
# Generate image files from each page
|
||||
temp_image_files = []
|
||||
try:
|
||||
result = await task
|
||||
if result:
|
||||
completed += 1
|
||||
except Exception as e:
|
||||
print(f"Task failed: {e}")
|
||||
finally:
|
||||
pbar.update(1)
|
||||
for page_num in range(1, num_pages + 1):
|
||||
# Get base64 PNG data for the current page
|
||||
base64_png = render_pdf_to_base64png(pdf_path, page_num, target_longest_image_dim=2048)
|
||||
|
||||
print(f"Completed {completed} out of {len(limited_tasks)} tasks for {candidate}")
|
||||
# Decode base64 and save to temporary file
|
||||
temp_img = tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False)
|
||||
temp_img.write(base64.b64decode(base64_png))
|
||||
temp_img.close()
|
||||
temp_image_files.append(temp_img.name)
|
||||
|
||||
# Convert all images to a single PDF using our enhanced function
|
||||
pdf_bytes = convert_image_to_pdf_bytes(temp_image_files)
|
||||
|
||||
# Write the PDF bytes to a temporary file
|
||||
temp_pdf = tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False)
|
||||
temp_pdf.write(pdf_bytes)
|
||||
temp_pdf.close()
|
||||
|
||||
# Update pdf_path to the new file
|
||||
pdf_path = temp_pdf.name
|
||||
|
||||
finally:
|
||||
# Clean up temporary image files
|
||||
for temp_file in temp_image_files:
|
||||
try:
|
||||
os.remove(temp_file)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to remove temporary file {temp_file}: {e}")
|
||||
|
||||
for repeat in range(1, repeats + 1):
|
||||
for page_num in range(1, num_pages + 1):
|
||||
output_filename = f"{base_name}_pg{page_num}_repeat{repeat}.md"
|
||||
# Preserve the relative folder structure in the output directory
|
||||
candidate_pdf_dir = os.path.join(candidate_output_dir, pdf_relative_dir)
|
||||
os.makedirs(candidate_pdf_dir, exist_ok=True)
|
||||
output_path = os.path.join(candidate_pdf_dir, output_filename)
|
||||
|
||||
if os.path.exists(output_path) and not force:
|
||||
print(f"Skipping {base_name}_pg{page_num}_repeat{repeat} for {candidate}, file already exists")
|
||||
print("Rerun with --force flag to force regeneration")
|
||||
continue
|
||||
|
||||
task = process_pdf(pdf_path, page_num, method, kwargs, output_path, is_async, executor)
|
||||
tasks.append(task)
|
||||
task_descriptions[id(task)] = f"{base_name}_pg{page_num}_repeat{repeat} ({candidate})"
|
||||
|
||||
# Process tasks with semaphore to limit concurrency
|
||||
semaphore = asyncio.Semaphore(max_parallel or 1) # Default to 1 if not specified
|
||||
|
||||
async def process_with_semaphore(task):
|
||||
async with semaphore:
|
||||
return await task
|
||||
|
||||
# Wrap each task with the semaphore
|
||||
limited_tasks = [process_with_semaphore(task) for task in tasks]
|
||||
|
||||
# Process tasks with progress bar
|
||||
if limited_tasks:
|
||||
completed = 0
|
||||
with tqdm(total=len(limited_tasks), desc=f"Processing {candidate}") as pbar:
|
||||
for task in asyncio.as_completed(limited_tasks):
|
||||
try:
|
||||
result = await task
|
||||
if result:
|
||||
completed += 1
|
||||
except Exception as e:
|
||||
print(f"Task failed: {e}")
|
||||
finally:
|
||||
pbar.update(1)
|
||||
|
||||
print(f"Completed {completed} out of {len(limited_tasks)} tasks for {candidate}")
|
||||
finally:
|
||||
# Clean up the executor
|
||||
if executor:
|
||||
executor.shutdown(wait=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user