Semaphore timeout

This commit is contained in:
Jake Poznanski 2024-11-12 11:53:29 -08:00
parent 102c0e4cfc
commit 193e5214d1

View File

@ -444,6 +444,7 @@ async def sglang_server_task(args, semaphore):
can_release_automatically = False
last_semaphore_release = time.time()
async def process_line(line):
nonlocal last_running_req, last_queue_req, can_release_automatically, last_semaphore_release
sglang_logger.info(line)
match = re.search(r'#running-req: (\d+)', line)
@ -459,7 +460,7 @@ async def sglang_server_task(args, semaphore):
queue_req = int(match.group(1))
logger.info(f"sglang running req: {last_running_req} queue req: {queue_req}")
nonlocal last_queue_req
if last_queue_req != 0 and queue_req == 0:
# Release the semaphore when queue_req transitions from non-zero to zero
if semaphore.locked():
@ -470,9 +471,10 @@ async def sglang_server_task(args, semaphore):
last_queue_req = queue_req
# And have a semaphore release automatically if there are no running requests for > 30 seconds
if last_running_req == 0 and time.time() - last_semaphore_release > 30 and semaphore.locked():
if last_running_req == 0 and can_release_automatically and time.time() - last_semaphore_release > 30 and semaphore.locked():
semaphore.release()
last_semaphore_release = time.time()
can_release_automatically = False
logger.info("Semaphore released due to timeout, allowing a worker to proceed.")
async def read_stream(stream):