diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 5a1578ed8..1eca68393 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -1,38 +1,10 @@ -import { CONCURRENCY_LIMIT } from "../services/rate-limiter"; import { redisConnection } from "../services/queue-service"; -import { PlanType } from "../types"; -import type { Job, JobsOptions } from "bullmq"; +import type { JobsOptions } from "bullmq"; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id; -export function calculateJobTimeToRun( - job: ConcurrencyLimitedJob -): number { - let jobTimeToRun = 86400000; // 24h (crawl) - - if (job.data.scrapeOptions) { - if (job.data.scrapeOptions.timeout) { - jobTimeToRun = job.data.scrapeOptions.timeout; - } - - if (job.data.scrapeOptions.waitFor) { - jobTimeToRun += job.data.scrapeOptions.waitFor; - } - - (job.data.scrapeOptions.actions ?? []).forEach(x => { - if (x.type === "wait" && x.milliseconds) { - jobTimeToRun += x.milliseconds; - } else { - jobTimeToRun += 1000; - } - }) - } - - return jobTimeToRun; -} - export async function cleanOldConcurrencyLimitEntries( team_id: string, now: number = Date.now(), diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 7433e6f90..7e2a6f03f 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,10 +1,8 @@ -import { Job, JobsOptions } from "bullmq"; import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; -import { NotificationType, PlanType, WebScraperOptions } from "../types"; +import { PlanType, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { - calculateJobTimeToRun, cleanOldConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyQueueJobsCount, @@ -13,7 +11,6 @@ import { } from "../lib/concurrency-limit"; import { logger } from "../lib/logger"; import { getConcurrencyLimitMax } from "./rate-limiter"; -import { sendNotificationWithCustomDays } from "./notification/email_notification"; async function _addScrapeJobToConcurrencyQueue( webScraperOptions: any, @@ -44,15 +41,7 @@ export async function _addScrapeJobToBullMQ( webScraperOptions.team_id && webScraperOptions.plan ) { - await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, calculateJobTimeToRun({ - id: jobId, - opts: { - ...options, - priority: jobPriority, - jobId, - }, - data: webScraperOptions, - })); + await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout } await getScrapeQueue().add(jobId, webScraperOptions, { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index efaaa2132..cb967b57d 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -52,7 +52,6 @@ import { configDotenv } from "dotenv"; import { scrapeOptions } from "../controllers/v1/types"; import { getRateLimiterPoints } from "./rate-limiter"; import { - calculateJobTimeToRun, cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, @@ -247,6 +246,11 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { extendInterval: jobLockExtendInterval, extensionTime: jobLockExtensionTime, }); + + if (job.data?.mode !== "kickoff" && job.data?.team_id) { + await pushConcurrencyLimitActiveJob(job.data.team_id, job.id, 60 * 1000); // 60s lock renew, just like in the queue + } + await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); @@ -597,7 +601,7 @@ const workerFun = async ( // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); if (nextJob !== null) { - await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, calculateJobTimeToRun(nextJob)); + await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, 60 * 1000); // 60s initial timeout await queue.add( nextJob.id,