diff --git a/pdelfin/silver_data/runopenaibatch.py b/pdelfin/silver_data/runopenaibatch.py index 4bbdee4..7aff239 100644 --- a/pdelfin/silver_data/runopenaibatch.py +++ b/pdelfin/silver_data/runopenaibatch.py @@ -53,6 +53,8 @@ def download_batch_result(batch_id, output_folder): print(f"WARNING: {batch_id} is completed, but not output file was generated") return batch_id, False + print(f"Downloading batch data for {batch_id}") + file_response = client.files.content(batch_data.output_file_id) # Define output file path @@ -149,12 +151,15 @@ def get_done_total(folder_path): def process_folder(folder_path: str, max_gb: int): output_folder = f"{folder_path}_done" os.makedirs(output_folder, exist_ok=True) + last_loop_time = datetime.datetime.now() starting_free_space = MAX_OPENAI_DISK_SPACE - get_total_space_usage() if starting_free_space < max_gb * 2: raise ValueError(f"Insufficient free space in OpenAI's file storage: Only {starting_free_space} GB left, but 2x{max_gb} GB are required (1x for your uploads, 1x for your results).") + starting_free_space = 250 * 1024 * 1024 + while not all(state["state"] in FINISHED_STATES for state in get_state(folder_path).values()): done, total = get_done_total(folder_path) print(f"Total items {total}, done {done}, {done/total*100:.1f}%") @@ -163,7 +168,7 @@ def process_folder(folder_path: str, max_gb: int): print(f"Processing {os.path.basename(work_item['filename'])}, cur status = {work_item['state']}") # If all work items have been checked on, then you need to sleep a bit - if work_item["last_checked"] > datetime.datetime.now() - datetime.timedelta(seconds=1): + if last_loop_time > datetime.datetime.now() - datetime.timedelta(seconds=1): time.sleep(1) if work_item["state"] == "init": @@ -176,6 +181,8 @@ def process_folder(folder_path: str, max_gb: int): update_state(folder_path, work_item["filename"], state="init") else: print("waiting for something to finish processing before uploading more") + # Update the time you checked so you can move onto the next time + update_state(folder_path, work_item["filename"]) elif work_item["state"] == "processing": batch_data = client.batches.retrieve(work_item["batch_id"]) @@ -207,7 +214,9 @@ def process_folder(folder_path: str, max_gb: int): print("Could not delete old file data") else: # Update the time you checked so you can move onto the next time - update_state(folder_path, work_item["filename"], state="processing") + update_state(folder_path, work_item["filename"]) + + last_loop_time = datetime.datetime.now() print("All work has been completed") @@ -215,9 +224,17 @@ if __name__ == "__main__": # Set up argument parsing for folder input parser = argparse.ArgumentParser(description='Upload .jsonl files and process batches in OpenAI API.') parser.add_argument("--max_gb", type=int, default=25, help="Max number of GB of batch processing files to upload at one time") + parser.add_argument("--clear_all_files", action="store_true", help="Helper to delete ALL files stored in your openai account") parser.add_argument('folder', type=str, help='Path to the folder containing .jsonl files') args = parser.parse_args() + if args.clear_all_files: + all_files = list(client.files.list()) + if input(f"Are you sure you want to delete {len(all_files)} files from your OpenAI account? [y/N]").lower() == "y": + for file in tqdm(all_files): + client.files.delete(file.id) + quit() + # Process the folder and start batches process_folder(args.folder, args.max_gb)