This commit is contained in:
Gergő Móricz 2025-06-06 13:31:32 +02:00
parent 435cb1608b
commit 8e4285a0f1
6 changed files with 374 additions and 261 deletions

View File

@ -0,0 +1,30 @@
import { concurrencyCheck, crawlWithConcurrencyTracking } from "./lib";
let accountConcurrencyLimit = 2;
beforeAll(async () => {
const { maxConcurrency } = await concurrencyCheck();
accountConcurrencyLimit = maxConcurrency;
console.log("Account concurrency limit:", accountConcurrencyLimit);
}, 10000);
describe("Concurrency queue and limit", () => {
it("crawl utilizes full concurrency limit and doesn't go over", async () => {
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
url: "https://firecrawl.dev",
limit: accountConcurrencyLimit * 2,
});
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
}, 600000);
it("crawl handles maxConcurrency properly", async () => {
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
url: "https://firecrawl.dev",
limit: 15,
maxConcurrency: 5,
});
expect(Math.max(...concurrencies)).toBe(5);
}, 600000);
});

View File

@ -325,6 +325,44 @@ export async function tokenUsage(): Promise<{ remaining_tokens: number }> {
.set("Content-Type", "application/json")).body.data;
}
// =========================================
// Concurrency API
// =========================================
export async function concurrencyCheck(): Promise<{ concurrency: number, maxConcurrency: number }> {
const x = (await request(TEST_URL)
.get("/v1/concurrency-check")
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
.set("Content-Type", "application/json"));
expect(x.statusCode).toBe(200);
expect(x.body.success).toBe(true);
return x.body;
}
export async function crawlWithConcurrencyTracking(body: CrawlRequestInput): Promise<{
crawl: Exclude<CrawlStatusResponse, ErrorResponse>;
concurrencies: number[];
}> {
const cs = await crawlStart(body);
expectCrawlStartToSucceed(cs);
let x, concurrencies: number[] = [];
do {
x = await crawlStatus(cs.body.id);
expect(x.statusCode).toBe(200);
expect(typeof x.body.status).toBe("string");
concurrencies.push((await concurrencyCheck()).concurrency);
} while (x.body.status === "scraping");
expectCrawlToSucceed(x);
return {
crawl: x.body,
concurrencies,
};
}
// =========================================
// =========================================

View File

@ -22,7 +22,7 @@ import { configDotenv } from "dotenv";
import type { Job, JobState, Queue } from "bullmq";
import { logger } from "../../lib/logger";
import { supabase_rr_service, supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
import { getJobFromGCS } from "../../lib/gcs-jobs";
configDotenv();
@ -162,9 +162,7 @@ export async function crawlStatusController(
),
);
const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set();
const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]);
const throttledJobsSet = new Set(await getConcurrencyLimitedJobs(req.auth.team_id));
const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = [];

View File

@ -1,12 +1,15 @@
import { RateLimiterMode } from "../types";
import { redisEvictConnection } from "../services/redis";
import type { JobsOptions } from "bullmq";
import type { Job, JobsOptions } from "bullmq";
import { getACUCTeam } from "../controllers/auth";
import { getCrawl } from "./crawl-redis";
import { getScrapeQueue } from "../services/queue-service";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id;
const constructCrawlKey = (crawl_id: string) => "crawl-concurrency-limiter:" + crawl_id;
const constructCrawlQueueKey = (crawl_id: string) => "crawl-concurrency-limit-queue:" + crawl_id;
export async function cleanOldConcurrencyLimitEntries(
team_id: string,
@ -65,14 +68,33 @@ export async function takeConcurrencyLimitedJob(
return JSON.parse(res[1][0][0]);
}
async function takeConcurrencyLimitedJobAndTimeout(
team_id: string,
): Promise<{
job: ConcurrencyLimitedJob;
timeout: number;
} | null> {
await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now());
const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN");
if (res === null || res === undefined) {
return null;
}
return {
job: JSON.parse(res[1][0][0]),
timeout: parseFloat(res[1][0][1]),
};
}
export async function pushConcurrencyLimitedJob(
team_id: string,
job: ConcurrencyLimitedJob,
timeout: number,
now: number = Date.now(),
) {
await redisEvictConnection.zadd(
constructQueueKey(team_id),
Date.now() + timeout,
now + timeout,
JSON.stringify(job),
);
}
@ -126,34 +148,114 @@ export async function removeCrawlConcurrencyLimitActiveJob(
await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id);
}
export async function takeCrawlConcurrencyLimitedJob(
crawl_id: string,
): Promise<ConcurrencyLimitedJob | null> {
const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN");
if (res === null || res === undefined) {
return null;
/**
* Grabs the next job from the team's concurrency limit queue. Handles crawl concurrency limits.
*
* This function may only be called once the outer code has verified that the team has not reached its concurrency limit.
*
* @param teamId
* @returns A job that can be run, or null if there are no more jobs to run.
*/
async function getNextConcurrentJob(teamId: string): Promise<{
job: ConcurrencyLimitedJob;
timeout: number;
} | null> {
let ignoredJobs: {
job: ConcurrencyLimitedJob;
timeout: number;
}[] = [];
let finalJob: {
job: ConcurrencyLimitedJob;
timeout: number;
} | null = null;
while (finalJob === null) {
const res = await takeConcurrencyLimitedJobAndTimeout(teamId);
if (res === null) {
break;
}
// If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit
if (res.job.data.crawl_id) {
const sc = await getCrawl(res.job.data.crawl_id);
const maxCrawlConcurrency = sc === null
? null
: sc.crawlerOptions.delay !== undefined
? 1
: sc.maxConcurrency ?? null;
if (maxCrawlConcurrency !== null) {
// If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit
const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length;
if (currentActiveConcurrency < maxCrawlConcurrency) {
// If we're under the max concurrency limit, we can run the job
finalJob = res;
} else {
// If we're at the max concurrency limit, we need to ignore the job
ignoredJobs.push({
job: res.job,
timeout: res.timeout,
});
}
} else {
// If the crawl has no max concurrency limit, we can run the job
finalJob = res;
}
} else {
// If the job is not associated with a crawl ID, we can run the job
finalJob = res;
}
}
return JSON.parse(res[1][0][0]);
for (const ignoredJob of ignoredJobs) {
const timeout = ignoredJob.timeout - Date.now();
await pushConcurrencyLimitedJob(teamId, ignoredJob.job, timeout);
}
return finalJob;
}
export async function pushCrawlConcurrencyLimitedJob(
crawl_id: string,
job: ConcurrencyLimitedJob,
) {
await redisEvictConnection.zadd(
constructCrawlQueueKey(crawl_id),
job.priority ?? 1,
JSON.stringify(job),
);
}
/**
* Called when a job associated with a concurrency queue is done.
*
* @param job The BullMQ job that is done.
*/
export async function concurrentJobDone(job: Job) {
if (job.id && job.data && job.data.team_id) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
await cleanOldConcurrencyLimitEntries(job.data.team_id);
export async function getCrawlConcurrencyLimitedJobs(
crawl_id: string,
) {
return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id));
}
if (job.data.crawl_id) {
await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id);
await cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
}
export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise<number> {
const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id));
return count;
const maxTeamConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
if (currentActiveConcurrency < maxTeamConcurrency) {
const nextJob = await getNextConcurrentJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.job.id, 60 * 1000);
if (nextJob.job.data.crawl_id) {
await pushCrawlConcurrencyLimitActiveJob(nextJob.job.data.crawl_id, nextJob.job.id, 60 * 1000);
}
(await getScrapeQueue()).add(
nextJob.job.id,
{
...nextJob.job.data,
concurrencyLimitHit: true,
},
{
...nextJob.job.opts,
jobId: nextJob.job.id,
priority: nextJob.job.priority,
}
);
}
}
}
}

View File

@ -4,14 +4,12 @@ import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyQueueJobsCount,
getCrawlConcurrencyQueueJobsCount,
getCrawlConcurrencyLimitActiveJobs,
pushConcurrencyLimitActiveJob,
pushConcurrencyLimitedJob,
pushCrawlConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { sendNotificationWithCustomDays } from './notification/email_notification';
@ -19,7 +17,7 @@ import { shouldSendConcurrencyLimitNotification } from './notification/notificat
import { getACUC, getACUCTeam } from "../controllers/auth";
import { getJobFromGCS } from "../lib/gcs-jobs";
import { Document } from "../controllers/v1/types";
import type { Logger } from "winston";
import { getCrawl } from "../lib/crawl-redis";
/**
* Checks if a job is a crawl or batch scrape based on its options
@ -50,25 +48,6 @@ async function _addScrapeJobToConcurrencyQueue(
}, webScraperOptions.crawl_id ? Infinity :(webScraperOptions.scrapeOptions?.timeout ?? (60 * 1000)));
}
async function _addCrawlScrapeJobToConcurrencyQueue(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
await pushCrawlConcurrencyLimitedJob(webScraperOptions.crawl_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
// NEVER ADD THESE TO BULLMQ!!! THEY ARE ADDED IN QUEUE-WORKER!!! SHOOOOO!!! - mogery
}
export async function _addScrapeJobToBullMQ(
webScraperOptions: any,
options: any,
@ -100,18 +79,6 @@ async function addScrapeJobRaw(
jobPriority: number,
directToBullMQ: boolean = false,
) {
const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay;
if (hasCrawlDelay) {
await _addCrawlScrapeJobToConcurrencyQueue(
webScraperOptions,
options,
jobId,
jobPriority
);
return;
}
let concurrencyLimited = false;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
@ -210,140 +177,189 @@ export async function addScrapeJobs(
) {
if (jobs.length === 0) return true;
const addToCCQ = jobs.filter(job => job.data.crawlerOptions?.delay);
const dontAddToCCQ = jobs.filter(job => !job.data.crawlerOptions?.delay);
const jobsByTeam = new Map<string, {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[]>();
let countCanBeDirectlyAdded = Infinity;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
for (const job of jobs) {
if (!jobsByTeam.has(job.data.team_id)) {
jobsByTeam.set(job.data.team_id, []);
}
jobsByTeam.get(job.data.team_id)!.push(job);
}
for (const [teamId, teamJobs] of jobsByTeam) {
// == Buckets for jobs ==
let jobsForcedToCQ: {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[] = [];
let jobsPotentiallyInCQ: {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[] = [];
// == Select jobs by crawl ID ==
const jobsByCrawlID = new Map<string, {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[]>();
const jobsWithoutCrawlID: {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[] = [];
for (const job of teamJobs) {
if (job.data.crawl_id) {
if (!jobsByCrawlID.has(job.data.crawl_id)) {
jobsByCrawlID.set(job.data.crawl_id, []);
}
jobsByCrawlID.get(job.data.crawl_id)!.push(job);
} else {
jobsWithoutCrawlID.push(job);
}
}
// == Select jobs by crawl ID ==
for (const [crawlID, crawlJobs] of jobsByCrawlID) {
const crawl = await getCrawl(crawlID);
const concurrencyLimit = !crawl
? null
: crawl.crawlerOptions.delay === undefined && crawl.maxConcurrency === undefined
? null
: crawl.maxConcurrency ?? 1;
if (concurrencyLimit === null) {
// All jobs may be in the CQ depending on the global team concurrency limit
jobsPotentiallyInCQ.push(...crawlJobs);
} else {
const crawlConcurrency = (await getCrawlConcurrencyLimitActiveJobs(crawlID)).length;
const freeSlots = Math.max(concurrencyLimit - crawlConcurrency, 0);
// The first n jobs may be in the CQ depending on the global team concurrency limit
jobsPotentiallyInCQ.push(...crawlJobs.slice(0, freeSlots));
// Every job after that must be in the CQ, as the crawl concurrency limit has been reached
jobsForcedToCQ.push(...crawlJobs.slice(freeSlots));
}
}
// All jobs without a crawl ID may be in the CQ depending on the global team concurrency limit
jobsPotentiallyInCQ.push(...jobsWithoutCrawlID);
if (dontAddToCCQ[0] && dontAddToCCQ[0].data && dontAddToCCQ[0].data.team_id) {
const now = Date.now();
maxConcurrency = (await getACUCTeam(dontAddToCCQ[0].data.team_id, false, true, dontAddToCCQ[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(dontAddToCCQ[0].data.team_id, now);
const maxConcurrency = (await getACUCTeam(teamId, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
await cleanOldConcurrencyLimitEntries(teamId, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(dontAddToCCQ[0].data.team_id, now)).length;
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(teamId, now)).length;
countCanBeDirectlyAdded = Math.max(
const countCanBeDirectlyAdded = Math.max(
maxConcurrency - currentActiveConcurrency,
0,
);
}
const addToBull = dontAddToCCQ.slice(0, countCanBeDirectlyAdded);
const addToCQ = dontAddToCCQ.slice(countCanBeDirectlyAdded);
const addToBull = jobsPotentiallyInCQ.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobsPotentiallyInCQ.slice(countCanBeDirectlyAdded).concat(jobsForcedToCQ);
// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
// logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`);
// Only send notification if it's not a crawl or batch scrape
if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached)", { error });
});
// equals 2x the max concurrency
if((jobsPotentiallyInCQ.length - countCanBeDirectlyAdded) > maxConcurrency) {
// logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`);
// Only send notification if it's not a crawl or batch scrape
if (!isCrawlOrBatchScrape(jobs[0].data)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached)", { error });
});
}
}
}
await Promise.all(
addToCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
const jobData = {
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
};
await _addScrapeJobToConcurrencyQueue(
jobData,
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToBullMQ(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
}
await Promise.all(
addToCCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addCrawlScrapeJobToConcurrencyQueue(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
const jobData = {
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
};
await _addScrapeJobToConcurrencyQueue(
jobData,
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToBullMQ(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
}
export function waitForJob(

View File

@ -50,15 +50,8 @@ import { getJobs } from "..//controllers/v1/crawl-status";
import { configDotenv } from "dotenv";
import { scrapeOptions } from "../controllers/v1/types";
import {
cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
concurrentJobDone,
pushConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
removeCrawlConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
takeCrawlConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
@ -803,71 +796,7 @@ const workerFun = async (
runningJobs.delete(job.id);
}
const sc = job.data.crawl_id ? await getCrawl(job.data.crawl_id) : null;
if (job.id && job.data.crawl_id && sc?.maxConcurrency) {
await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id);
cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
if (job.data.crawlerOptions?.delay) {
const delayInSeconds = job.data.crawlerOptions.delay;
const delayInMs = delayInSeconds * 1000;
await new Promise(resolve => setTimeout(resolve, delayInMs));
}
const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id);
if (nextCrawlJob !== null) {
await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000);
await queue.add(
nextCrawlJob.id,
{
...nextCrawlJob.data,
},
{
...nextCrawlJob.opts,
jobId: nextCrawlJob.id,
priority: nextCrawlJob.priority,
},
);
}
}
if (job.id && job.data && job.data.team_id) {
const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
await cleanOldConcurrencyLimitEntries(job.data.team_id);
// Check if we're under the concurrency limit before adding a new job
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
const concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
if (!concurrencyLimited) {
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(
job.data.team_id,
nextJob.id,
60 * 1000,
); // 60s initial timeout
await queue.add(
nextJob.id,
{
...nextJob.data,
concurrencyLimitHit: true,
},
{
...nextJob.opts,
jobId: nextJob.id,
priority: nextJob.priority,
},
);
}
}
}
await concurrentJobDone(job);
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {