From c6717fecaa0156201219b18410d0513495fa84cd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 16:11:12 -0300 Subject: [PATCH] Nick: got rid of job interval sleep and math.min --- apps/api/src/controllers/v1/concurrency-check.ts | 2 +- apps/api/src/services/queue-worker.ts | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts index 6ed4fa551..8695c6e6d 100644 --- a/apps/api/src/controllers/v1/concurrency-check.ts +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -12,7 +12,7 @@ export async function concurrencyCheckController( req: RequestWithAuth, res: Response ) { - const concurrencyLimiterKey = "concurrency-limiter:" + req.params.teamId; + const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id; const now = Date.now(); const activeJobsOfTeam = await redisConnection.zrangebyscore( concurrencyLimiterKey, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 0aa28cba3..532e8feeb 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -151,12 +151,8 @@ const workerFun = async ( await job.moveToFailed(new Error("Concurrency limit hit"), token, false); // Remove the job from the queue await job.remove(); - // Increment the priority of the job exponentially by 5% - let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.05); - // Max priority is 200k, limit is 2 million - if(newJobPriority > 200000) { - newJobPriority = 200000; - } + // Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million + const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000); // Add the job back to the queue with the new priority await queue.add(job.name, { ...job.data, @@ -167,7 +163,7 @@ const workerFun = async ( priority: newJobPriority, // exponential backoff for stuck jobs }); - await sleep(gotJobInterval); + // await sleep(gotJobInterval); continue; } else { // If we are not throttled, add the job back to the queue with the new priority