diff --git a/apps/api/src/controllers/v0/crawl-status.ts b/apps/api/src/controllers/v0/crawl-status.ts index 41491f86..1b1ffdc5 100644 --- a/apps/api/src/controllers/v0/crawl-status.ts +++ b/apps/api/src/controllers/v0/crawl-status.ts @@ -54,9 +54,9 @@ export async function crawlStatusController(req: Request, res: Response) { const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active"; - const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); + const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); if ( jobs.length > 0 && diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index a95c85a6..3ebee976 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -171,7 +171,8 @@ export async function crawlController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: req.body.origin ?? defaultOrigin, crawl_id: id, @@ -211,7 +212,8 @@ export async function crawlController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: req.body.origin ?? defaultOrigin, crawl_id: id, diff --git a/apps/api/src/controllers/v0/crawlPreview.ts b/apps/api/src/controllers/v0/crawlPreview.ts index f8706867..bceb1df9 100644 --- a/apps/api/src/controllers/v0/crawlPreview.ts +++ b/apps/api/src/controllers/v0/crawlPreview.ts @@ -107,7 +107,8 @@ export async function crawlPreviewController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: "website-preview", crawl_id: id, @@ -121,7 +122,8 @@ export async function crawlPreviewController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: "website-preview", crawl_id: id, diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 2a0fcc13..f5dbc3d1 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -61,6 +61,7 @@ export async function scrapeHelper( crawlerOptions, team_id, pageOptions, + plan, extractorOptions, origin: req.body.origin ?? defaultOrigin, is_scrape: true, @@ -196,7 +197,7 @@ export async function scrapeController(req: Request, res: Response) { await checkTeamCredits(chunk, team_id, 1); if (!creditsCheckSuccess) { earlyReturn = true; - return res.status(402).json({ error: "Insufficient credits" }); + return res.status(402).json({ error: "Insufficient credits. For more credits, you can upgrade your plan at https://firecrawl.dev/pricing" }); } } catch (error) { Logger.error(error); diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts new file mode 100644 index 00000000..8695c6e6 --- /dev/null +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -0,0 +1,25 @@ +import { authenticateUser } from "../auth"; +import { + ConcurrencyCheckParams, + ConcurrencyCheckResponse, + RequestWithAuth, +} from "./types"; +import { RateLimiterMode } from "../../types"; +import { Response } from "express"; +import { redisConnection } from "../../services/queue-service"; +// Basically just middleware and error wrapping +export async function concurrencyCheckController( + req: RequestWithAuth, + res: Response +) { + const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id; + const now = Date.now(); + const activeJobsOfTeam = await redisConnection.zrangebyscore( + concurrencyLimiterKey, + now, + Infinity + ); + return res + .status(200) + .json({ success: true, concurrency: activeJobsOfTeam.length }); +} diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 16a67682..9832a948 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -5,7 +5,7 @@ import { CrawlStatusParams, CrawlStatusResponse, Document, ErrorResponse, legacy import { WebSocket } from "ws"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../lib/logger"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; import * as Sentry from "@sentry/node"; @@ -95,8 +95,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobs = await getJobs(doneJobIDs); const data = doneJobs.map(x => x.returnvalue); diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index ec427a9c..9c0026a0 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -1,6 +1,6 @@ import { Response } from "express"; import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs"; import { configDotenv } from "dotenv"; @@ -58,8 +58,10 @@ export async function crawlStatusController(req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId); const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1); diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index e0883fa8..4efe279a 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -106,6 +106,7 @@ export async function crawlController( url, mode: "single_urls", team_id: req.auth.team_id, + plan: req.auth.plan, crawlerOptions, pageOptions, origin: "api", @@ -138,6 +139,7 @@ export async function crawlController( mode: "single_urls", crawlerOptions: crawlerOptions, team_id: req.auth.team_id, + plan: req.auth.plan, pageOptions: pageOptions, origin: "api", crawl_id: id, diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index 899ae74a..6da48999 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -44,6 +44,7 @@ export async function scrapeController( mode: "single_urls", crawlerOptions: {}, team_id: req.auth.team_id, + plan: req.auth.plan, pageOptions, extractorOptions, origin: req.body.origin, diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 4030b31e..3781eb78 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -216,6 +216,7 @@ export type Document = { actions?: { screenshots: string[]; }; + warning?: string; metadata: { title?: string; description?: string; @@ -293,6 +294,17 @@ export type CrawlStatusParams = { jobId: string; }; +export type ConcurrencyCheckParams = { + teamId: string; +}; + +export type ConcurrencyCheckResponse = + | ErrorResponse + | { + success: true; + concurrency: number; + }; + export type CrawlStatusResponse = | ErrorResponse | { @@ -444,6 +456,7 @@ export function legacyDocumentConverter(doc: any): Document { extract: doc.llm_extraction, screenshot: doc.screenshot ?? doc.fullPageScreenshot, actions: doc.actions ?? undefined, + warning: doc.warning ?? undefined, metadata: { ...doc.metadata, pageError: undefined, diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 6d578e5e..f0ece43f 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -83,6 +83,10 @@ export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } +export async function getThrottledJobs(teamId: string): Promise { + return await redisConnection.zrangebyscore("concurrency-limiter:" + teamId + ":throttled", Date.now(), Infinity); +} + export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise { if (typeof sc.crawlerOptions?.limit === "number") { if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) { diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index 49a41ce7..b0ceceb4 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -16,6 +16,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { crawlCancelController } from "../controllers/v1/crawl-cancel"; import { Logger } from "../lib/logger"; import { scrapeStatusController } from "../controllers/v1/scrape-status"; +import { concurrencyCheckController } from "../controllers/v1/concurrency-check"; // import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview"; // import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status"; // import { searchController } from "../../src/controllers/v1/search"; @@ -140,11 +141,19 @@ v1Router.get( wrap(scrapeStatusController) ); +v1Router.get( + "/concurrency-check", + authMiddleware(RateLimiterMode.CrawlStatus), + wrap(concurrencyCheckController) +); + v1Router.ws( "/crawl/:jobId", crawlStatusWSController ); + + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index 13dfc26e..756cd765 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -23,7 +23,7 @@ export async function getLinksFromSitemap( const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); content = response.data; } else if (mode === 'fire-engine') { - const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"tlsclient", disableJsDom: true, mobileProxy: true } }); + const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"playwright" } }); content = response.html; } } catch (error) { diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 7a698772..315700a1 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -63,8 +63,11 @@ export function waitForJob(jobId: string, timeout: number) { resolve((await getScrapeQueue().getJob(jobId)).returnvalue); } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); - clearInterval(int); - reject((await getScrapeQueue().getJob(jobId)).failedReason); + const job = await getScrapeQueue().getJob(jobId); + if (job && job.failedReason !== "Concurrency limit hit") { + clearInterval(int); + reject(job.failedReason); + } } } }, 500); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 37e14baf..532e8fee 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from "@hyperdx/node-opentelemetry"; -import { Job } from "bullmq"; +import { Job, Queue } from "bullmq"; import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; @@ -34,9 +34,10 @@ import { deleteJobPriority, getJobPriority, } from "../../src/lib/job-priority"; -import { PlanType } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { getJobs } from "../../src/controllers/v1/crawl-status"; import { configDotenv } from "dotenv"; +import { getRateLimiterPoints } from "./rate-limiter"; configDotenv(); if (process.env.ENV === "production") { @@ -99,10 +100,10 @@ process.on("SIGINT", () => { }); const workerFun = async ( - queueName: string, + queue: Queue, processJobInternal: (token: string, job: Job) => Promise ) => { - const worker = new Worker(queueName, null, { + const worker = new Worker(queue.name, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute // lockRenewTime: 15 * 1000, // 15 seconds @@ -129,6 +130,49 @@ const workerFun = async ( const job = await worker.getNextJob(token); if (job) { + const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; + + if (job.data && job.data.team_id && job.data.plan) { + const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled"; + const concurrencyLimit = getRateLimiterPoints(RateLimiterMode.Scrape, undefined, job.data.plan); + const now = Date.now(); + const stalledJobTimeoutMs = 2 * 60 * 1000; + const throttledJobTimeoutMs = 10 * 60 * 1000; + + redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now); + redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); + const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); + if (activeJobsOfTeam.length >= concurrencyLimit) { + // Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue + // Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); + // Concurrency limit hit, throttles the job + await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); + // We move to failed with a specific error + await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + // Remove the job from the queue + await job.remove(); + // Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million + const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000); + // Add the job back to the queue with the new priority + await queue.add(job.name, { + ...job.data, + concurrencyLimitHit: true, + }, { + ...job.opts, + jobId: job.id, + priority: newJobPriority, // exponential backoff for stuck jobs + }); + + // await sleep(gotJobInterval); + continue; + } else { + // If we are not throttled, add the job back to the queue with the new priority + await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + // Remove the job from the throttled list + await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); + } + } + if (job.data && job.data.sentry && Sentry.isInitialized()) { Sentry.continueTrace( { @@ -159,7 +203,15 @@ const workerFun = async ( }, }, async () => { - const res = await processJobInternal(token, job); + let res; + try { + res = await processJobInternal(token, job); + } finally { + if (job.id && job.data && job.data.team_id) { + await redisConnection.zrem(concurrencyLimiterKey, job.id); + } + } + if (res !== null) { span.setStatus({ code: 2 }); // ERROR } else { @@ -181,7 +233,12 @@ const workerFun = async ( }, }, () => { - processJobInternal(token, job); + processJobInternal(token, job) + .finally(() => { + if (job.id && job.data && job.data.team_id) { + redisConnection.zrem(concurrencyLimiterKey, job.id); + } + }); } ); } @@ -193,7 +250,7 @@ const workerFun = async ( } }; -workerFun(scrapeQueueName, processJobInternal); +workerFun(getScrapeQueue(), processJobInternal); async function processJob(job: Job, token: string) { Logger.info(`🐂 Worker taking job ${job.id}`); @@ -254,7 +311,10 @@ async function processJob(job: Job, token: string) { }, project_id: job.data.project_id, error: message /* etc... */, - docs, + docs: job.data.concurrencyLimitHit ? docs.map(x => ({ + ...x, + warning: "This scrape was throttled because you hit you concurrency limit." + (x.warning ? " " + x.warning : ""), + })) : docs, }; // No idea what this does and when it is called. @@ -331,6 +391,7 @@ async function processJob(job: Job, token: string) { mode: "single_urls", crawlerOptions: sc.crawlerOptions, team_id: sc.team_id, + plan: job.data.plan, pageOptions: sc.pageOptions, origin: job.data.origin, crawl_id: job.data.crawl_id, diff --git a/apps/api/src/services/rate-limiter.test.ts b/apps/api/src/services/rate-limiter.test.ts index 3e252301..ba4a0a73 100644 --- a/apps/api/src/services/rate-limiter.test.ts +++ b/apps/api/src/services/rate-limiter.test.ts @@ -49,7 +49,7 @@ describe("Rate Limiter Service", () => { "nonexistent" as RateLimiterMode, "test-prefix:someToken" ); - expect(limiter).toBe(serverRateLimiter); + expect(limiter.points).toBe(serverRateLimiter.points); }); it("should return the correct rate limiter based on mode and plan", () => { @@ -210,7 +210,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(3); + expect(limiter2.points).toBe(10); const limiter3 = getRateLimiter( "crawl" as RateLimiterMode, @@ -233,7 +233,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(20); + expect(limiter2.points).toBe(100); const limiter3 = getRateLimiter( "scrape" as RateLimiterMode, @@ -263,14 +263,14 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(20); + expect(limiter2.points).toBe(50); const limiter3 = getRateLimiter( "search" as RateLimiterMode, "test-prefix:someToken", "standard" ); - expect(limiter3.points).toBe(40); + expect(limiter3.points).toBe(50); }); it("should return the correct rate limiter for 'preview' mode", () => { diff --git a/apps/api/src/services/rate-limiter.ts b/apps/api/src/services/rate-limiter.ts index 51a0ecfa..21e05948 100644 --- a/apps/api/src/services/rate-limiter.ts +++ b/apps/api/src/services/rate-limiter.ts @@ -123,14 +123,32 @@ const testSuiteTokens = ["a01ccae", "6254cf9", "0f96e673", "23befa1b", "69141c4" const manual = ["69be9e74-7624-4990-b20d-08e0acc70cf6"]; -export function getRateLimiter( +function makePlanKey(plan?: string) { + return plan ? plan.replace("-", "") : "default"; // "default" +} + +export function getRateLimiterPoints( mode: RateLimiterMode, - token: string, + token?: string, plan?: string, teamId?: string -) { +) : number { + const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5} + + if (!rateLimitConfig) return RATE_LIMITS.account.default; - if (testSuiteTokens.some(testToken => token.includes(testToken))) { + const points : number = + rateLimitConfig[makePlanKey(plan)] || rateLimitConfig.default; // 5 + return points; +} + +export function getRateLimiter( + mode: RateLimiterMode, + token?: string, + plan?: string, + teamId?: string + ) : RateLimiterRedis { + if (token && testSuiteTokens.some(testToken => token.includes(testToken))) { return testSuiteRateLimiter; } @@ -141,14 +159,6 @@ export function getRateLimiter( if(teamId && manual.includes(teamId)) { return manualRateLimiter; } - - const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5} - - if (!rateLimitConfig) return serverRateLimiter; - - const planKey = plan ? plan.replace("-", "") : "default"; // "default" - const points = - rateLimitConfig[planKey] || rateLimitConfig.default || rateLimitConfig; // 5 - - return createRateLimiter(`${mode}-${planKey}`, points); + + return createRateLimiter(`${mode}-${makePlanKey(plan)}`, getRateLimiterPoints(mode, token, plan, teamId)); } diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 3795ce1e..a03176da 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -28,6 +28,7 @@ export interface WebScraperOptions { pageOptions: any; extractorOptions?: any; team_id: string; + plan: string; origin?: string; crawl_id?: string; sitemapped?: boolean;