Creds and other things

This commit is contained in:
Jake Poznanski 2024-11-13 13:14:33 -08:00
parent a3b6962d21
commit fe3c9a2709

View File

@ -230,6 +230,20 @@ async def load_pdf_work_queue(args) -> asyncio.Queue:
return queue
async def work_item_completed(args, work_hash: str) -> bool:
# Check if work item has already been completed
output_s3_path = os.path.join(args.workspace, 'dolma_documents', f'output_{work_hash}.jsonl')
bucket, key = parse_s3_path(output_s3_path)
try:
# Check if the output file already exists
await asyncio.to_thread(workspace_s3.head_object, Bucket=bucket, Key=key)
return True
except workspace_s3.exceptions.ClientError as e:
pass
return False
async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str, pdf_local_path: str, page_num: int) -> PageResult:
COMPLETION_URL = "http://localhost:30000/v1/chat/completions"
@ -368,7 +382,9 @@ async def worker(args, queue, semaphore, worker_id):
# Wait until allowed to proceed
await semaphore.acquire()
# TODO: Double check that the work item has not been done already by looking at the s3 workspace
if await work_item_completed(args, work_hash):
logger.info(f"Work {work_hash} was already completed, skipping")
continue
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
connector=aiohttp.TCPConnector(limit=1000)) as session:
@ -629,6 +645,9 @@ async def main():
os.makedirs(os.path.dirname(cred_path), exist_ok=True)
with open(cred_path, "w") as f:
f.write(os.environ.get("AWS_CREDENTIALS_FILE"))
global workspace_s3, pdf_s3
workspace_s3 = workspace_session.client("s3")
pdf_s3 = pdf_session.client("s3")
if args.workspace_profile:
global workspace_s3