olmocr/scripts/data/runopenaibatch.py

259 lines
9.6 KiB
Python
Raw Normal View History

2024-10-04 15:27:43 +00:00
# Sends list of batch files to OpenAI for processing
2025-01-29 15:30:39 -08:00
# However, it also waits and gets the files when they are done, saves its state, and
2024-10-04 15:27:43 +00:00
# allows you to submit more than the 100GB of file request limits that the openaiAPI has
2025-01-29 15:25:10 -08:00
import argparse
import datetime
import json
2024-10-04 15:27:43 +00:00
import os
import time
2025-01-29 15:25:10 -08:00
2024-10-04 15:27:43 +00:00
from openai import OpenAI
from tqdm import tqdm
# Set up OpenAI client (API key should be set in the environment)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
2025-01-29 15:30:39 -08:00
MAX_OPENAI_DISK_SPACE = 100 * 1024 * 1024 * 1024 # Max is 100GB on openAI
2024-10-04 15:27:43 +00:00
UPLOAD_STATE_FILENAME = "SENDSILVER_DATA"
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
# Function to upload a file to OpenAI and start batch processing
def upload_and_start_batch(file_path):
2024-10-04 15:54:00 +00:00
# Upload the file to OpenAI
2025-01-29 15:30:39 -08:00
with open(file_path, "rb") as file:
2024-10-04 15:54:00 +00:00
print(f"Uploading {file_path} to OpenAI Batch API...")
upload_response = client.files.create(file=file, purpose="batch")
file_id = upload_response.id
print(f"File uploaded successfully: {file_id}")
# Create a batch job
print(f"Creating batch job for {file_path}...")
batch_response = client.batches.create(
2025-01-29 15:30:39 -08:00
input_file_id=file_id, endpoint="/v1/chat/completions", completion_window="24h", metadata={"description": "pdf gold/silver data"}
2024-10-04 15:54:00 +00:00
)
2025-01-29 15:30:39 -08:00
2024-10-04 15:54:00 +00:00
batch_id = batch_response.id
print(f"Batch created successfully: {batch_id}")
return batch_id
2024-10-04 15:27:43 +00:00
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
def download_batch_result(batch_id, output_folder):
# Retrieve the batch result from OpenAI API
batch_data = client.batches.retrieve(batch_id)
if batch_data.status != "completed":
print(f"WARNING: {batch_id} is not completed, status: {batch_data.status}")
return batch_id, False
2025-01-29 15:30:39 -08:00
2024-10-04 15:54:00 +00:00
if batch_data.output_file_id is None:
2025-01-10 19:38:42 +00:00
print(f"WARNING: {batch_id} is completed, but no output file was generated")
2024-10-04 15:54:00 +00:00
return batch_id, False
2024-10-04 15:27:43 +00:00
2024-10-04 16:58:38 +00:00
print(f"Downloading batch data for {batch_id}")
2024-10-04 15:27:43 +00:00
file_response = client.files.content(batch_data.output_file_id)
# Define output file path
output_file = os.path.join(output_folder, f"{batch_id}.json")
# Save the result to a file
2025-01-29 15:30:39 -08:00
with open(output_file, "w") as f:
2024-10-04 15:27:43 +00:00
f.write(str(file_response.text))
return batch_id, True
ALL_STATES = ["init", "processing", "completed", "errored_out", "could_not_upload"]
2025-01-29 15:30:39 -08:00
FINISHED_STATES = ["completed", "errored_out"]
2024-10-04 15:27:43 +00:00
2024-10-04 15:35:52 +00:00
def _json_datetime_decoder(obj):
2025-01-29 15:30:39 -08:00
if "last_checked" in obj:
2024-10-04 15:35:52 +00:00
try:
2025-01-29 15:30:39 -08:00
obj["last_checked"] = datetime.datetime.fromisoformat(obj["last_checked"])
2024-10-04 15:35:52 +00:00
except (TypeError, ValueError):
pass # If it's not a valid ISO format, leave it as is
return obj
2024-10-04 15:27:43 +00:00
2025-01-29 15:30:39 -08:00
2024-10-04 16:10:19 +00:00
def _json_datetime_encoder(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat() # Convert datetime to ISO format string
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
2024-10-04 15:27:43 +00:00
def get_state(folder_path: str) -> dict:
state_file = os.path.join(folder_path, UPLOAD_STATE_FILENAME)
2024-10-04 15:35:52 +00:00
try:
2024-10-04 15:27:43 +00:00
with open(state_file, "r") as f:
2024-10-04 15:35:52 +00:00
return json.load(f, object_hook=_json_datetime_decoder)
except (json.decoder.JSONDecodeError, FileNotFoundError):
2024-10-04 15:27:43 +00:00
# List all .jsonl files in the specified folder
2025-01-29 15:30:39 -08:00
jsonl_files = [f for f in os.listdir(folder_path) if f.endswith(".jsonl")]
2024-10-04 15:27:43 +00:00
if not jsonl_files:
raise Exception("No JSONL files found to process")
2025-01-29 15:30:39 -08:00
state = {
f: {
"filename": f,
"batch_id": None,
"state": "init",
"size": os.path.getsize(os.path.join(folder_path, f)),
"last_checked": datetime.datetime.now(),
}
for f in jsonl_files
}
2024-10-04 15:27:43 +00:00
with open(state_file, "w") as f:
2024-10-04 16:10:19 +00:00
json.dump(state, f, default=_json_datetime_encoder)
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
return state
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
def update_state(folder_path: str, filename: str, **kwargs):
all_state = get_state(folder_path)
for kwarg_name, kwarg_value in kwargs.items():
all_state[filename][kwarg_name] = kwarg_value
2024-10-04 16:10:19 +00:00
all_state[filename]["last_checked"] = datetime.datetime.now()
2024-10-04 15:27:43 +00:00
state_file = os.path.join(folder_path, UPLOAD_STATE_FILENAME)
2025-01-29 15:30:39 -08:00
temp_file = state_file + ".tmp"
# Write to temporary file first
with open(temp_file, "w") as f:
json.dump(all_state, f, default=_json_datetime_encoder)
f.flush()
os.fsync(f.fileno())
2025-01-29 15:30:39 -08:00
# Atomic rename of temporary file to target file
os.replace(temp_file, state_file)
2025-01-29 15:30:39 -08:00
return all_state
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
def get_total_space_usage():
2024-10-04 15:54:00 +00:00
return sum(file.bytes for file in client.files.list())
2024-10-04 15:27:43 +00:00
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
def get_estimated_space_usage(folder_path):
all_states = get_state(folder_path)
return sum(s["size"] for s in all_states.values() if s["state"] == "processing")
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
def get_next_work_item(folder_path):
all_states = list(get_state(folder_path).values())
all_states = [s for s in all_states if s["state"] not in FINISHED_STATES]
2024-10-04 15:27:43 +00:00
all_states.sort(key=lambda s: s["last_checked"])
return all_states[0] if len(all_states) > 0 else None
2025-01-29 15:30:39 -08:00
2024-10-04 16:05:08 +00:00
def get_done_total(folder_path):
2025-01-29 15:30:39 -08:00
processing, done, total = 0, 0, 0
2024-10-04 15:27:43 +00:00
2024-10-04 16:05:08 +00:00
for state in get_state(folder_path).values():
if state["state"] in FINISHED_STATES:
done += 1
2024-10-04 18:06:04 +00:00
if state["state"] == "processing":
processing += 1
2024-10-04 16:05:08 +00:00
total += 1
2024-10-04 18:06:04 +00:00
return processing, done, total
2024-10-04 15:27:43 +00:00
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
# Main function to process all .jsonl files in a folder
def process_folder(folder_path: str, max_gb: int):
2024-10-10 18:52:42 +00:00
output_folder = f"{folder_path.rstrip('/')}_done"
2024-10-04 15:27:43 +00:00
os.makedirs(output_folder, exist_ok=True)
2024-10-04 16:58:38 +00:00
last_loop_time = datetime.datetime.now()
2024-10-04 15:27:43 +00:00
starting_free_space = MAX_OPENAI_DISK_SPACE - get_total_space_usage()
2024-10-04 18:06:04 +00:00
if starting_free_space < (max_gb * 1024**3) * 2:
2025-01-29 15:30:39 -08:00
raise ValueError(
f"Insufficient free space in OpenAI's file storage: Only {starting_free_space} GB left, but 2x{max_gb} GB are required (1x for your uploads, 1x for your results)."
)
2024-10-04 15:27:43 +00:00
2024-10-04 15:35:52 +00:00
while not all(state["state"] in FINISHED_STATES for state in get_state(folder_path).values()):
2024-10-04 18:06:04 +00:00
processing, done, total = get_done_total(folder_path)
print(f"Total items {total}, processing {processing}, done {done}, {done/total*100:.1f}%")
2024-10-04 16:05:08 +00:00
2024-10-04 15:27:43 +00:00
work_item = get_next_work_item(folder_path)
2024-10-04 15:35:52 +00:00
print(f"Processing {os.path.basename(work_item['filename'])}, cur status = {work_item['state']}")
2024-10-04 15:27:43 +00:00
# If all work items have been checked on, then you need to sleep a bit
2024-10-04 16:58:38 +00:00
if last_loop_time > datetime.datetime.now() - datetime.timedelta(seconds=1):
2024-10-04 18:06:04 +00:00
time.sleep(0.2)
2024-10-04 15:27:43 +00:00
if work_item["state"] == "init":
2024-10-04 18:06:04 +00:00
if get_estimated_space_usage(folder_path) < (max_gb * 1024**3):
2024-10-04 15:27:43 +00:00
try:
2024-10-04 15:54:00 +00:00
batch_id = upload_and_start_batch(os.path.join(folder_path, work_item["filename"]))
2024-10-04 15:27:43 +00:00
update_state(folder_path, work_item["filename"], state="processing", batch_id=batch_id)
2024-10-04 15:54:00 +00:00
except Exception as ex:
print(ex)
2024-10-04 15:27:43 +00:00
update_state(folder_path, work_item["filename"], state="init")
else:
print("waiting for something to finish processing before uploading more")
2024-10-04 16:58:38 +00:00
# Update the time you checked so you can move onto the next time
update_state(folder_path, work_item["filename"])
2024-10-04 15:27:43 +00:00
elif work_item["state"] == "processing":
batch_data = client.batches.retrieve(work_item["batch_id"])
if batch_data.status == "completed":
batch_id, success = download_batch_result(work_item["batch_id"], output_folder)
if success:
update_state(folder_path, work_item["filename"], state="completed")
else:
update_state(folder_path, work_item["filename"], state="errored_out")
2024-10-04 15:54:00 +00:00
try:
client.files.delete(batch_data.input_file_id)
except Exception as ex:
print(ex)
print("Could not delete old input data")
try:
client.files.delete(batch_data.output_file_id)
except Exception as ex:
print(ex)
print("Could not delete old output data")
2024-10-04 15:27:43 +00:00
elif batch_data.status in ["failed", "expired", "cancelled"]:
update_state(folder_path, work_item["filename"], state="errored_out")
try:
client.files.delete(batch_data.input_file_id)
except:
print("Could not delete old file data")
2024-10-04 15:54:00 +00:00
else:
# Update the time you checked so you can move onto the next time
2024-10-04 16:58:38 +00:00
update_state(folder_path, work_item["filename"])
last_loop_time = datetime.datetime.now()
2024-10-04 15:27:43 +00:00
2024-10-04 15:54:00 +00:00
print("All work has been completed")
2024-10-04 15:27:43 +00:00
2025-01-29 15:30:39 -08:00
2024-10-04 15:27:43 +00:00
if __name__ == "__main__":
# Set up argument parsing for folder input
2025-01-29 15:30:39 -08:00
parser = argparse.ArgumentParser(description="Upload .jsonl files and process batches in OpenAI API.")
2024-10-04 15:27:43 +00:00
parser.add_argument("--max_gb", type=int, default=25, help="Max number of GB of batch processing files to upload at one time")
2024-10-04 16:58:38 +00:00
parser.add_argument("--clear_all_files", action="store_true", help="Helper to delete ALL files stored in your openai account")
2025-01-29 15:30:39 -08:00
parser.add_argument("folder", type=str, help="Path to the folder containing .jsonl files")
2024-10-04 15:27:43 +00:00
args = parser.parse_args()
2024-10-04 16:58:38 +00:00
if args.clear_all_files:
all_files = list(client.files.list())
if input(f"Are you sure you want to delete {len(all_files)} files from your OpenAI account? [y/N]").lower() == "y":
for file in tqdm(all_files):
client.files.delete(file.id)
quit()
2024-10-04 15:27:43 +00:00
# Process the folder and start batches
process_folder(args.folder, args.max_gb)