From fe3c9a2709bab08e9169cfabdea3edfc9d4b0ad4 Mon Sep 17 00:00:00 2001 From: Jake Poznanski Date: Wed, 13 Nov 2024 13:14:33 -0800 Subject: [PATCH] Creds and other things --- pdelfin/beakerpipeline.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index 65b89fe..e67cd5d 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -230,6 +230,20 @@ async def load_pdf_work_queue(args) -> asyncio.Queue: return queue +async def work_item_completed(args, work_hash: str) -> bool: + # Check if work item has already been completed + output_s3_path = os.path.join(args.workspace, 'dolma_documents', f'output_{work_hash}.jsonl') + bucket, key = parse_s3_path(output_s3_path) + + try: + # Check if the output file already exists + await asyncio.to_thread(workspace_s3.head_object, Bucket=bucket, Key=key) + return True + except workspace_s3.exceptions.ClientError as e: + pass + + return False + async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str, pdf_local_path: str, page_num: int) -> PageResult: COMPLETION_URL = "http://localhost:30000/v1/chat/completions" @@ -368,7 +382,9 @@ async def worker(args, queue, semaphore, worker_id): # Wait until allowed to proceed await semaphore.acquire() - # TODO: Double check that the work item has not been done already by looking at the s3 workspace + if await work_item_completed(args, work_hash): + logger.info(f"Work {work_hash} was already completed, skipping") + continue async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=1000)) as session: @@ -629,6 +645,9 @@ async def main(): os.makedirs(os.path.dirname(cred_path), exist_ok=True) with open(cred_path, "w") as f: f.write(os.environ.get("AWS_CREDENTIALS_FILE")) + global workspace_s3, pdf_s3 + workspace_s3 = workspace_session.client("s3") + pdf_s3 = pdf_session.client("s3") if args.workspace_profile: global workspace_s3