diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index 849e0bc..7cbffb8 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -444,6 +444,7 @@ async def sglang_server_task(args, semaphore): can_release_automatically = False last_semaphore_release = time.time() async def process_line(line): + nonlocal last_running_req, last_queue_req, can_release_automatically, last_semaphore_release sglang_logger.info(line) match = re.search(r'#running-req: (\d+)', line) @@ -459,7 +460,7 @@ async def sglang_server_task(args, semaphore): queue_req = int(match.group(1)) logger.info(f"sglang running req: {last_running_req} queue req: {queue_req}") - nonlocal last_queue_req + if last_queue_req != 0 and queue_req == 0: # Release the semaphore when queue_req transitions from non-zero to zero if semaphore.locked(): @@ -470,9 +471,10 @@ async def sglang_server_task(args, semaphore): last_queue_req = queue_req # And have a semaphore release automatically if there are no running requests for > 30 seconds - if last_running_req == 0 and time.time() - last_semaphore_release > 30 and semaphore.locked(): + if last_running_req == 0 and can_release_automatically and time.time() - last_semaphore_release > 30 and semaphore.locked(): semaphore.release() last_semaphore_release = time.time() + can_release_automatically = False logger.info("Semaphore released due to timeout, allowing a worker to proceed.") async def read_stream(stream):