diff --git a/apps/api/src/__tests__/snips/concurrency.test.ts b/apps/api/src/__tests__/snips/concurrency.test.ts new file mode 100644 index 000000000..42cdf1716 --- /dev/null +++ b/apps/api/src/__tests__/snips/concurrency.test.ts @@ -0,0 +1,30 @@ +import { concurrencyCheck, crawlWithConcurrencyTracking } from "./lib"; + +let accountConcurrencyLimit = 2; + +beforeAll(async () => { + const { maxConcurrency } = await concurrencyCheck(); + accountConcurrencyLimit = maxConcurrency; + console.log("Account concurrency limit:", accountConcurrencyLimit); +}, 10000); + +describe("Concurrency queue and limit", () => { + it("crawl utilizes full concurrency limit and doesn't go over", async () => { + const { crawl, concurrencies } = await crawlWithConcurrencyTracking({ + url: "https://firecrawl.dev", + limit: accountConcurrencyLimit * 2, + }); + + expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit); + }, 600000); + + it("crawl handles maxConcurrency properly", async () => { + const { crawl, concurrencies } = await crawlWithConcurrencyTracking({ + url: "https://firecrawl.dev", + limit: 15, + maxConcurrency: 5, + }); + + expect(Math.max(...concurrencies)).toBe(5); + }, 600000); +}); \ No newline at end of file diff --git a/apps/api/src/__tests__/snips/lib.ts b/apps/api/src/__tests__/snips/lib.ts index a4c71f3a9..f34c2db73 100644 --- a/apps/api/src/__tests__/snips/lib.ts +++ b/apps/api/src/__tests__/snips/lib.ts @@ -325,6 +325,44 @@ export async function tokenUsage(): Promise<{ remaining_tokens: number }> { .set("Content-Type", "application/json")).body.data; } +// ========================================= +// Concurrency API +// ========================================= + +export async function concurrencyCheck(): Promise<{ concurrency: number, maxConcurrency: number }> { + const x = (await request(TEST_URL) + .get("/v1/concurrency-check") + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`) + .set("Content-Type", "application/json")); + + expect(x.statusCode).toBe(200); + expect(x.body.success).toBe(true); + return x.body; +} + +export async function crawlWithConcurrencyTracking(body: CrawlRequestInput): Promise<{ + crawl: Exclude; + concurrencies: number[]; +}> { + const cs = await crawlStart(body); + expectCrawlStartToSucceed(cs); + + let x, concurrencies: number[] = []; + + do { + x = await crawlStatus(cs.body.id); + expect(x.statusCode).toBe(200); + expect(typeof x.body.status).toBe("string"); + concurrencies.push((await concurrencyCheck()).concurrency); + } while (x.body.status === "scraping"); + + expectCrawlToSucceed(x); + return { + crawl: x.body, + concurrencies, + }; +} + // ========================================= // ========================================= diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index f8c5fd874..e45f49e96 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -22,7 +22,7 @@ import { configDotenv } from "dotenv"; import type { Job, JobState, Queue } from "bullmq"; import { logger } from "../../lib/logger"; import { supabase_rr_service, supabase_service } from "../../services/supabase"; -import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; +import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; import { getJobFromGCS } from "../../lib/gcs-jobs"; configDotenv(); @@ -162,9 +162,7 @@ export async function crawlStatusController( ), ); - const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id); - const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set(); - const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]); + const throttledJobsSet = new Set(await getConcurrencyLimitedJobs(req.auth.team_id)); const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobIDs: string[] = []; diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 59634e0b3..828ecfd34 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -1,12 +1,15 @@ +import { RateLimiterMode } from "../types"; import { redisEvictConnection } from "../services/redis"; -import type { JobsOptions } from "bullmq"; +import type { Job, JobsOptions } from "bullmq"; +import { getACUCTeam } from "../controllers/auth"; +import { getCrawl } from "./crawl-redis"; +import { getScrapeQueue } from "../services/queue-service"; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id; const constructCrawlKey = (crawl_id: string) => "crawl-concurrency-limiter:" + crawl_id; -const constructCrawlQueueKey = (crawl_id: string) => "crawl-concurrency-limit-queue:" + crawl_id; export async function cleanOldConcurrencyLimitEntries( team_id: string, @@ -65,14 +68,33 @@ export async function takeConcurrencyLimitedJob( return JSON.parse(res[1][0][0]); } +async function takeConcurrencyLimitedJobAndTimeout( + team_id: string, +): Promise<{ + job: ConcurrencyLimitedJob; + timeout: number; +} | null> { + await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); + const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN"); + if (res === null || res === undefined) { + return null; + } + + return { + job: JSON.parse(res[1][0][0]), + timeout: parseFloat(res[1][0][1]), + }; +} + export async function pushConcurrencyLimitedJob( team_id: string, job: ConcurrencyLimitedJob, timeout: number, + now: number = Date.now(), ) { await redisEvictConnection.zadd( constructQueueKey(team_id), - Date.now() + timeout, + now + timeout, JSON.stringify(job), ); } @@ -126,34 +148,114 @@ export async function removeCrawlConcurrencyLimitActiveJob( await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id); } -export async function takeCrawlConcurrencyLimitedJob( - crawl_id: string, -): Promise { - const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); - if (res === null || res === undefined) { - return null; +/** + * Grabs the next job from the team's concurrency limit queue. Handles crawl concurrency limits. + * + * This function may only be called once the outer code has verified that the team has not reached its concurrency limit. + * + * @param teamId + * @returns A job that can be run, or null if there are no more jobs to run. + */ +async function getNextConcurrentJob(teamId: string): Promise<{ + job: ConcurrencyLimitedJob; + timeout: number; +} | null> { + let ignoredJobs: { + job: ConcurrencyLimitedJob; + timeout: number; + }[] = []; + + let finalJob: { + job: ConcurrencyLimitedJob; + timeout: number; + } | null = null; + + while (finalJob === null) { + const res = await takeConcurrencyLimitedJobAndTimeout(teamId); + if (res === null) { + break; + } + + // If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit + if (res.job.data.crawl_id) { + const sc = await getCrawl(res.job.data.crawl_id); + const maxCrawlConcurrency = sc === null + ? null + : sc.crawlerOptions.delay !== undefined + ? 1 + : sc.maxConcurrency ?? null; + + if (maxCrawlConcurrency !== null) { + // If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit + const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length; + if (currentActiveConcurrency < maxCrawlConcurrency) { + // If we're under the max concurrency limit, we can run the job + finalJob = res; + } else { + // If we're at the max concurrency limit, we need to ignore the job + ignoredJobs.push({ + job: res.job, + timeout: res.timeout, + }); + } + } else { + // If the crawl has no max concurrency limit, we can run the job + finalJob = res; + } + } else { + // If the job is not associated with a crawl ID, we can run the job + finalJob = res; + } } - return JSON.parse(res[1][0][0]); + + for (const ignoredJob of ignoredJobs) { + const timeout = ignoredJob.timeout - Date.now(); + await pushConcurrencyLimitedJob(teamId, ignoredJob.job, timeout); + } + + return finalJob; } -export async function pushCrawlConcurrencyLimitedJob( - crawl_id: string, - job: ConcurrencyLimitedJob, -) { - await redisEvictConnection.zadd( - constructCrawlQueueKey(crawl_id), - job.priority ?? 1, - JSON.stringify(job), - ); -} +/** + * Called when a job associated with a concurrency queue is done. + * + * @param job The BullMQ job that is done. + */ +export async function concurrentJobDone(job: Job) { + if (job.id && job.data && job.data.team_id) { + await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); + await cleanOldConcurrencyLimitEntries(job.data.team_id); -export async function getCrawlConcurrencyLimitedJobs( - crawl_id: string, -) { - return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); -} + if (job.data.crawl_id) { + await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id); + await cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id); + } -export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise { - const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id)); - return count; + const maxTeamConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; + const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length; + + if (currentActiveConcurrency < maxTeamConcurrency) { + const nextJob = await getNextConcurrentJob(job.data.team_id); + if (nextJob !== null) { + await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.job.id, 60 * 1000); + + if (nextJob.job.data.crawl_id) { + await pushCrawlConcurrencyLimitActiveJob(nextJob.job.data.crawl_id, nextJob.job.id, 60 * 1000); + } + + (await getScrapeQueue()).add( + nextJob.job.id, + { + ...nextJob.job.data, + concurrencyLimitHit: true, + }, + { + ...nextJob.job.opts, + jobId: nextJob.job.id, + priority: nextJob.job.priority, + } + ); + } + } + } } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index e92545d07..8c8e98d2f 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -4,14 +4,12 @@ import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { cleanOldConcurrencyLimitEntries, - cleanOldCrawlConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyQueueJobsCount, - getCrawlConcurrencyQueueJobsCount, + getCrawlConcurrencyLimitActiveJobs, pushConcurrencyLimitActiveJob, pushConcurrencyLimitedJob, pushCrawlConcurrencyLimitActiveJob, - pushCrawlConcurrencyLimitedJob, } from "../lib/concurrency-limit"; import { logger } from "../lib/logger"; import { sendNotificationWithCustomDays } from './notification/email_notification'; @@ -19,7 +17,7 @@ import { shouldSendConcurrencyLimitNotification } from './notification/notificat import { getACUC, getACUCTeam } from "../controllers/auth"; import { getJobFromGCS } from "../lib/gcs-jobs"; import { Document } from "../controllers/v1/types"; -import type { Logger } from "winston"; +import { getCrawl } from "../lib/crawl-redis"; /** * Checks if a job is a crawl or batch scrape based on its options @@ -50,25 +48,6 @@ async function _addScrapeJobToConcurrencyQueue( }, webScraperOptions.crawl_id ? Infinity :(webScraperOptions.scrapeOptions?.timeout ?? (60 * 1000))); } -async function _addCrawlScrapeJobToConcurrencyQueue( - webScraperOptions: any, - options: any, - jobId: string, - jobPriority: number, -) { - await pushCrawlConcurrencyLimitedJob(webScraperOptions.crawl_id, { - id: jobId, - data: webScraperOptions, - opts: { - ...options, - priority: jobPriority, - jobId: jobId, - }, - priority: jobPriority, - }); - // NEVER ADD THESE TO BULLMQ!!! THEY ARE ADDED IN QUEUE-WORKER!!! SHOOOOO!!! - mogery -} - export async function _addScrapeJobToBullMQ( webScraperOptions: any, options: any, @@ -100,18 +79,6 @@ async function addScrapeJobRaw( jobPriority: number, directToBullMQ: boolean = false, ) { - const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay; - - if (hasCrawlDelay) { - await _addCrawlScrapeJobToConcurrencyQueue( - webScraperOptions, - options, - jobId, - jobPriority - ); - return; - } - let concurrencyLimited = false; let currentActiveConcurrency = 0; let maxConcurrency = 0; @@ -210,140 +177,189 @@ export async function addScrapeJobs( ) { if (jobs.length === 0) return true; - const addToCCQ = jobs.filter(job => job.data.crawlerOptions?.delay); - const dontAddToCCQ = jobs.filter(job => !job.data.crawlerOptions?.delay); + const jobsByTeam = new Map(); - let countCanBeDirectlyAdded = Infinity; - let currentActiveConcurrency = 0; - let maxConcurrency = 0; + for (const job of jobs) { + if (!jobsByTeam.has(job.data.team_id)) { + jobsByTeam.set(job.data.team_id, []); + } + jobsByTeam.get(job.data.team_id)!.push(job); + } + + for (const [teamId, teamJobs] of jobsByTeam) { + // == Buckets for jobs == + let jobsForcedToCQ: { + data: WebScraperOptions; + opts: { + jobId: string; + priority: number; + }; + }[] = []; + + let jobsPotentiallyInCQ: { + data: WebScraperOptions; + opts: { + jobId: string; + priority: number; + }; + }[] = []; + + // == Select jobs by crawl ID == + const jobsByCrawlID = new Map(); + + const jobsWithoutCrawlID: { + data: WebScraperOptions; + opts: { + jobId: string; + priority: number; + }; + }[] = []; + + for (const job of teamJobs) { + if (job.data.crawl_id) { + if (!jobsByCrawlID.has(job.data.crawl_id)) { + jobsByCrawlID.set(job.data.crawl_id, []); + } + jobsByCrawlID.get(job.data.crawl_id)!.push(job); + } else { + jobsWithoutCrawlID.push(job); + } + } + + // == Select jobs by crawl ID == + for (const [crawlID, crawlJobs] of jobsByCrawlID) { + const crawl = await getCrawl(crawlID); + const concurrencyLimit = !crawl + ? null + : crawl.crawlerOptions.delay === undefined && crawl.maxConcurrency === undefined + ? null + : crawl.maxConcurrency ?? 1; + + + if (concurrencyLimit === null) { + // All jobs may be in the CQ depending on the global team concurrency limit + jobsPotentiallyInCQ.push(...crawlJobs); + } else { + const crawlConcurrency = (await getCrawlConcurrencyLimitActiveJobs(crawlID)).length; + const freeSlots = Math.max(concurrencyLimit - crawlConcurrency, 0); + + // The first n jobs may be in the CQ depending on the global team concurrency limit + jobsPotentiallyInCQ.push(...crawlJobs.slice(0, freeSlots)); + + // Every job after that must be in the CQ, as the crawl concurrency limit has been reached + jobsForcedToCQ.push(...crawlJobs.slice(freeSlots)); + } + } + + // All jobs without a crawl ID may be in the CQ depending on the global team concurrency limit + jobsPotentiallyInCQ.push(...jobsWithoutCrawlID); - if (dontAddToCCQ[0] && dontAddToCCQ[0].data && dontAddToCCQ[0].data.team_id) { const now = Date.now(); - maxConcurrency = (await getACUCTeam(dontAddToCCQ[0].data.team_id, false, true, dontAddToCCQ[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; - cleanOldConcurrencyLimitEntries(dontAddToCCQ[0].data.team_id, now); + const maxConcurrency = (await getACUCTeam(teamId, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; + await cleanOldConcurrencyLimitEntries(teamId, now); - currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(dontAddToCCQ[0].data.team_id, now)).length; + const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(teamId, now)).length; - countCanBeDirectlyAdded = Math.max( + const countCanBeDirectlyAdded = Math.max( maxConcurrency - currentActiveConcurrency, 0, ); - } - const addToBull = dontAddToCCQ.slice(0, countCanBeDirectlyAdded); - const addToCQ = dontAddToCCQ.slice(countCanBeDirectlyAdded); + const addToBull = jobsPotentiallyInCQ.slice(0, countCanBeDirectlyAdded); + const addToCQ = jobsPotentiallyInCQ.slice(countCanBeDirectlyAdded).concat(jobsForcedToCQ); - // equals 2x the max concurrency - if(addToCQ.length > maxConcurrency) { - // logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`); - // Only send notification if it's not a crawl or batch scrape - if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) { - const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id); - if (shouldSendNotification) { - sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached)", { error }); - }); + // equals 2x the max concurrency + if((jobsPotentiallyInCQ.length - countCanBeDirectlyAdded) > maxConcurrency) { + // logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`); + // Only send notification if it's not a crawl or batch scrape + if (!isCrawlOrBatchScrape(jobs[0].data)) { + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); + if (shouldSendNotification) { + sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached)", { error }); + }); + } } } + + await Promise.all( + addToCQ.map(async (job) => { + const size = JSON.stringify(job.data).length; + return await Sentry.startSpan( + { + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": job.opts.jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + const jobData = { + ...job.data, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }; + + await _addScrapeJobToConcurrencyQueue( + jobData, + job.opts, + job.opts.jobId, + job.opts.priority, + ); + }, + ); + }), + ); + + await Promise.all( + addToBull.map(async (job) => { + const size = JSON.stringify(job.data).length; + return await Sentry.startSpan( + { + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": job.opts.jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await _addScrapeJobToBullMQ( + { + ...job.data, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }, + job.opts, + job.opts.jobId, + job.opts.priority, + ); + }, + ); + }), + ); } - - await Promise.all( - addToCCQ.map(async (job) => { - const size = JSON.stringify(job.data).length; - return await Sentry.startSpan( - { - name: "Add scrape job", - op: "queue.publish", - attributes: { - "messaging.message.id": job.opts.jobId, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": size, - }, - }, - async (span) => { - await _addCrawlScrapeJobToConcurrencyQueue( - { - ...job.data, - sentry: { - trace: Sentry.spanToTraceHeader(span), - baggage: Sentry.spanToBaggageHeader(span), - size, - }, - }, - job.opts, - job.opts.jobId, - job.opts.priority, - ); - }, - ); - }), - ); - - await Promise.all( - addToCQ.map(async (job) => { - const size = JSON.stringify(job.data).length; - return await Sentry.startSpan( - { - name: "Add scrape job", - op: "queue.publish", - attributes: { - "messaging.message.id": job.opts.jobId, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": size, - }, - }, - async (span) => { - const jobData = { - ...job.data, - sentry: { - trace: Sentry.spanToTraceHeader(span), - baggage: Sentry.spanToBaggageHeader(span), - size, - }, - }; - - await _addScrapeJobToConcurrencyQueue( - jobData, - job.opts, - job.opts.jobId, - job.opts.priority, - ); - }, - ); - }), - ); - - await Promise.all( - addToBull.map(async (job) => { - const size = JSON.stringify(job.data).length; - return await Sentry.startSpan( - { - name: "Add scrape job", - op: "queue.publish", - attributes: { - "messaging.message.id": job.opts.jobId, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": size, - }, - }, - async (span) => { - await _addScrapeJobToBullMQ( - { - ...job.data, - sentry: { - trace: Sentry.spanToTraceHeader(span), - baggage: Sentry.spanToBaggageHeader(span), - size, - }, - }, - job.opts, - job.opts.jobId, - job.opts.priority, - ); - }, - ); - }), - ); } export function waitForJob( diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index dd4960515..2d56cc45f 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -50,15 +50,8 @@ import { getJobs } from "..//controllers/v1/crawl-status"; import { configDotenv } from "dotenv"; import { scrapeOptions } from "../controllers/v1/types"; import { - cleanOldConcurrencyLimitEntries, - cleanOldCrawlConcurrencyLimitEntries, - getConcurrencyLimitActiveJobs, + concurrentJobDone, pushConcurrencyLimitActiveJob, - pushCrawlConcurrencyLimitActiveJob, - removeConcurrencyLimitActiveJob, - removeCrawlConcurrencyLimitActiveJob, - takeConcurrencyLimitedJob, - takeCrawlConcurrencyLimitedJob, } from "../lib/concurrency-limit"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; @@ -803,71 +796,7 @@ const workerFun = async ( runningJobs.delete(job.id); } - const sc = job.data.crawl_id ? await getCrawl(job.data.crawl_id) : null; - - if (job.id && job.data.crawl_id && sc?.maxConcurrency) { - await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id); - cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id); - - if (job.data.crawlerOptions?.delay) { - const delayInSeconds = job.data.crawlerOptions.delay; - const delayInMs = delayInSeconds * 1000; - - await new Promise(resolve => setTimeout(resolve, delayInMs)); - } - - const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id); - if (nextCrawlJob !== null) { - await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000); - - await queue.add( - nextCrawlJob.id, - { - ...nextCrawlJob.data, - }, - { - ...nextCrawlJob.opts, - jobId: nextCrawlJob.id, - priority: nextCrawlJob.priority, - }, - ); - } - } - - if (job.id && job.data && job.data.team_id) { - const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; - - await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); - await cleanOldConcurrencyLimitEntries(job.data.team_id); - - // Check if we're under the concurrency limit before adding a new job - const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length; - const concurrencyLimited = currentActiveConcurrency >= maxConcurrency; - - if (!concurrencyLimited) { - const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); - if (nextJob !== null) { - await pushConcurrencyLimitActiveJob( - job.data.team_id, - nextJob.id, - 60 * 1000, - ); // 60s initial timeout - - await queue.add( - nextJob.id, - { - ...nextJob.data, - concurrencyLimitHit: true, - }, - { - ...nextJob.opts, - jobId: nextJob.id, - priority: nextJob.priority, - }, - ); - } - } - } + await concurrentJobDone(job); } if (job.data && job.data.sentry && Sentry.isInitialized()) {