From 331e826bca263351b38fe223087a24a437514053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 19:25:18 +0200 Subject: [PATCH 01/25] feat(db): implement auth_credit_usage_chunk RPC --- apps/api/src/controllers/auth.ts | 212 +++++---------- apps/api/src/controllers/v0/crawl.ts | 4 +- apps/api/src/controllers/v0/scrape.ts | 4 +- apps/api/src/controllers/v0/search.ts | 4 +- apps/api/src/controllers/v1/types.ts | 44 ++- apps/api/src/routes/v1.ts | 10 +- .../src/services/billing/credit_billing.ts | 253 ++---------------- apps/api/src/types.ts | 2 + 8 files changed, 139 insertions(+), 394 deletions(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index d634b9ed..a428f9a3 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -17,6 +17,7 @@ import { getValue } from "../services/redis"; import { setValue } from "../services/redis"; import { validate } from "uuid"; import * as Sentry from "@sentry/node"; +import { AuthCreditUsageChunk } from "./v1/types"; // const { data, error } = await supabase_service // .from('api_keys') // .select(` @@ -35,6 +36,58 @@ function normalizedApiIsUuid(potentialUuid: string): boolean { // Check if the string is a valid UUID return validate(potentialUuid); } + +async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) { + const cacheKeyACUC = `acuc_${api_key}`; + const redLockKey = `lock_${cacheKeyACUC}`; + const lockTTL = 10000; // 10 seconds + + try { + const lock = await redlock.acquire([redLockKey], lockTTL); + + try { + // Cache for 10 minutes. This means that changing subscription tier could have + // a maximum of 10 minutes of a delay. - mogery + await setValue(cacheKeyACUC, JSON.stringify(acuc), 600); + } finally { + await lock.release(); + } + } catch (error) { + Logger.error(`Error updating cached ACUC: ${error}`); + Sentry.captureException(error); + } +} + +async function getACUC(api_key: string): Promise { + const cacheKeyACUC = `acuc_${api_key}`; + + const cachedACUC = await getValue(cacheKeyACUC); + + if (cachedACUC !== null) { + return JSON.parse(cacheKeyACUC); + } else { + const { data, error } = + await supabase_service.rpc("auth_credit_usage_chunk", { input_key: api_key }); + + if (error) { + throw new Error("Failed to retrieve authentication and credit usage data: " + JSON.stringify(error)); + } + + const chunk: AuthCreditUsageChunk | null = data.length === 0 + ? null + : data[0].team_id === null + ? null + : data[0]; + + // NOTE: Should we cache null chunks? - mogery + if (chunk !== null) { + setCachedACUC(api_key, chunk); + } + + return chunk; + } +} + export async function authenticateUser( req, res, @@ -42,6 +95,7 @@ export async function authenticateUser( ): Promise { return withAuth(supaAuthenticateUser)(req, res, mode); } + function setTrace(team_id: string, api_key: string) { try { setTraceAttributes({ @@ -54,45 +108,6 @@ function setTrace(team_id: string, api_key: string) { } } -async function getKeyAndPriceId(normalizedApi: string): Promise<{ - success: boolean; - teamId?: string; - priceId?: string; - error?: string; - status?: number; -}> { - const { data, error } = await supabase_service.rpc("get_key_and_price_id_2", { - api_key: normalizedApi, - }); - if (error) { - Sentry.captureException(error); - Logger.error(`RPC ERROR (get_key_and_price_id_2): ${error.message}`); - return { - success: false, - error: - "The server seems overloaded. Please contact hello@firecrawl.com if you aren't sending too many requests at once.", - status: 500, - }; - } - if (!data || data.length === 0) { - if (error) { - Logger.warn(`Error fetching api key: ${error.message} or data is empty`); - Sentry.captureException(error); - } - // TODO: change this error code ? - return { - success: false, - error: "Unauthorized: Invalid token", - status: 401, - }; - } else { - return { - success: true, - teamId: data[0].team_id, - priceId: data[0].price_id, - }; - } -} export async function supaAuthenticateUser( req, res, @@ -103,8 +118,8 @@ export async function supaAuthenticateUser( error?: string; status?: number; plan?: PlanType; + chunk?: AuthCreditUsageChunk; }> { - const authHeader = req.headers.authorization ?? (req.headers["sec-websocket-protocol"] ? `Bearer ${req.headers["sec-websocket-protocol"]}` : null); if (!authHeader) { return { success: false, error: "Unauthorized", status: 401 }; @@ -126,11 +141,9 @@ export async function supaAuthenticateUser( let subscriptionData: { team_id: string; plan: string } | null = null; let normalizedApi: string; - let cacheKey = ""; - let redLockKey = ""; - const lockTTL = 15000; // 10 seconds let teamId: string | null = null; let priceId: string | null = null; + let chunk: AuthCreditUsageChunk; if (token == "this_is_just_a_preview_token") { if (mode == RateLimiterMode.CrawlStatus) { @@ -149,85 +162,25 @@ export async function supaAuthenticateUser( }; } - cacheKey = `api_key:${normalizedApi}`; + chunk = await getACUC(normalizedApi); - try { - const teamIdPriceId = await getValue(cacheKey); - if (teamIdPriceId) { - const { team_id, price_id } = JSON.parse(teamIdPriceId); - teamId = team_id; - priceId = price_id; - } else { - const { - success, - teamId: tId, - priceId: pId, - error, - status, - } = await getKeyAndPriceId(normalizedApi); - if (!success) { - return { success, error, status }; - } - teamId = tId; - priceId = pId; - await setValue( - cacheKey, - JSON.stringify({ team_id: teamId, price_id: priceId }), - 60 - ); - } - } catch (error) { - Sentry.captureException(error); - Logger.error(`Error with auth function: ${error}`); - // const { - // success, - // teamId: tId, - // priceId: pId, - // error: e, - // status, - // } = await getKeyAndPriceId(normalizedApi); - // if (!success) { - // return { success, error: e, status }; - // } - // teamId = tId; - // priceId = pId; - // const { - // success, - // teamId: tId, - // priceId: pId, - // error: e, - // status, - // } = await getKeyAndPriceId(normalizedApi); - // if (!success) { - // return { success, error: e, status }; - // } - // teamId = tId; - // priceId = pId; + if (chunk === null) { + return { + success: false, + error: "Unauthorized: Invalid token", + status: 401, + }; } - // get_key_and_price_id_2 rpc definition: - // create or replace function get_key_and_price_id_2(api_key uuid) - // returns table(key uuid, team_id uuid, price_id text) as $$ - // begin - // if api_key is null then - // return query - // select null::uuid as key, null::uuid as team_id, null::text as price_id; - // end if; - - // return query - // select ak.key, ak.team_id, s.price_id - // from api_keys ak - // left join subscriptions s on ak.team_id = s.team_id - // where ak.key = api_key; - // end; - // $$ language plpgsql; + teamId = chunk.team_id; + priceId = chunk.price_id; const plan = getPlanByPriceId(priceId); // HyperDX Logging setTrace(teamId, normalizedApi); subscriptionData = { team_id: teamId, - plan: plan, + plan, }; switch (mode) { case RateLimiterMode.Crawl: @@ -291,14 +244,6 @@ export async function supaAuthenticateUser( endDate.setDate(endDate.getDate() + 7); // await sendNotification(team_id, NotificationType.RATE_LIMIT_REACHED, startDate.toISOString(), endDate.toISOString()); - // Cache longer for 429s - if (teamId && priceId && mode !== RateLimiterMode.Preview) { - await setValue( - cacheKey, - JSON.stringify({ team_id: teamId, price_id: priceId }), - 60 // 10 seconds, cache for everything - ); - } return { success: false, @@ -329,34 +274,11 @@ export async function supaAuthenticateUser( // return { success: false, error: "Unauthorized: Invalid token", status: 401 }; } - // make sure api key is valid, based on the api_keys table in supabase - if (!subscriptionData) { - normalizedApi = parseApi(token); - - const { data, error } = await supabase_service - .from("api_keys") - .select("*") - .eq("key", normalizedApi); - - if (error || !data || data.length === 0) { - if (error) { - Sentry.captureException(error); - Logger.warn(`Error fetching api key: ${error.message} or data is empty`); - } - return { - success: false, - error: "Unauthorized: Invalid token", - status: 401, - }; - } - - subscriptionData = data[0]; - } - return { success: true, team_id: subscriptionData.team_id, plan: (subscriptionData.plan ?? "") as PlanType, + chunk, }; } function getPlanByPriceId(price_id: string): PlanType { diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index aefdb5e5..a95c85a6 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -18,7 +18,7 @@ import { getJobPriority } from "../../lib/job-priority"; export async function crawlController(req: Request, res: Response) { try { - const { success, team_id, error, status, plan } = await authenticateUser( + const { success, team_id, error, status, plan, chunk } = await authenticateUser( req, res, RateLimiterMode.Crawl @@ -68,7 +68,7 @@ export async function crawlController(req: Request, res: Response) { const limitCheck = req.body?.crawlerOptions?.limit ?? 1; const { success: creditsCheckSuccess, message: creditsCheckMessage, remainingCredits } = - await checkTeamCredits(team_id, limitCheck); + await checkTeamCredits(chunk, team_id, limitCheck); if (!creditsCheckSuccess) { return res.status(402).json({ error: "Insufficient credits. You may be requesting with a higher limit than the amount of credits you have left. If not, upgrade your plan at https://firecrawl.dev/pricing or contact us at hello@firecrawl.com" }); diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index c46ebc62..696f8f74 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -157,7 +157,7 @@ export async function scrapeController(req: Request, res: Response) { try { let earlyReturn = false; // make sure to authenticate user first, Bearer - const { success, team_id, error, status, plan } = await authenticateUser( + const { success, team_id, error, status, plan, chunk } = await authenticateUser( req, res, RateLimiterMode.Scrape @@ -193,7 +193,7 @@ export async function scrapeController(req: Request, res: Response) { // checkCredits try { const { success: creditsCheckSuccess, message: creditsCheckMessage } = - await checkTeamCredits(team_id, 1); + await checkTeamCredits(chunk, team_id, 1); if (!creditsCheckSuccess) { earlyReturn = true; return res.status(402).json({ error: "Insufficient credits" }); diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 5ef2b767..970acc25 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -131,7 +131,7 @@ export async function searchHelper( export async function searchController(req: Request, res: Response) { try { // make sure to authenticate user first, Bearer - const { success, team_id, error, status, plan } = await authenticateUser( + const { success, team_id, error, status, plan, chunk } = await authenticateUser( req, res, RateLimiterMode.Search @@ -155,7 +155,7 @@ export async function searchController(req: Request, res: Response) { try { const { success: creditsCheckSuccess, message: creditsCheckMessage } = - await checkTeamCredits(team_id, 1); + await checkTeamCredits(chunk, team_id, 1); if (!creditsCheckSuccess) { return res.status(402).json({ error: "Insufficient credits" }); } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index b3e8df77..c09a29f2 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -315,11 +315,51 @@ type Account = { remainingCredits: number; }; -export interface RequestWithMaybeAuth< +export type AuthCreditUsageChunk = { + api_key: string; + team_id: string; + sub_id: string; + sub_current_period_start: string; + sub_current_period_end: string; + price_id: string; + price_credits: number; // credit limit with assoicated price, or free_credits (500) if free plan + credits_used: number; + coupon_credits: number; + coupons: any[]; + adjusted_credits_used: number; // credits this period minus coupons used + remaining_credits: number; +}; + +export interface RequestWithMaybeACUC< ReqParams = {}, ReqBody = undefined, ResBody = undefined > extends Request { + acuc?: AuthCreditUsageChunk, +} + +export interface RequestWithACUC< + ReqParams = {}, + ReqBody = undefined, + ResBody = undefined +> extends Request { + acuc: AuthCreditUsageChunk, +} + +export interface RequestWithAuth< + ReqParams = {}, + ReqBody = undefined, + ResBody = undefined, +> extends Request { + auth: AuthObject; + account?: Account; +} + +export interface RequestWithMaybeAuth< + ReqParams = {}, + ReqBody = undefined, + ResBody = undefined +> extends RequestWithMaybeACUC { auth?: AuthObject; account?: Account; } @@ -328,7 +368,7 @@ export interface RequestWithAuth< ReqParams = {}, ReqBody = undefined, ResBody = undefined, -> extends Request { +> extends RequestWithACUC { auth: AuthObject; account?: Account; } diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index b827e863..1dad4844 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -4,7 +4,7 @@ import { crawlController } from "../controllers/v1/crawl"; import { scrapeController } from "../../src/controllers/v1/scrape"; import { crawlStatusController } from "../controllers/v1/crawl-status"; import { mapController } from "../controllers/v1/map"; -import { ErrorResponse, RequestWithAuth, RequestWithMaybeAuth } from "../controllers/v1/types"; +import { ErrorResponse, RequestWithACUC, RequestWithAuth, RequestWithMaybeAuth } from "../controllers/v1/types"; import { RateLimiterMode } from "../types"; import { authenticateUser } from "../controllers/auth"; import { createIdempotencyKey } from "../services/idempotency/create"; @@ -30,14 +30,15 @@ function checkCreditsMiddleware(minimum?: number): (req: RequestWithAuth, res: R if (!minimum && req.body) { minimum = (req.body as any)?.limit ?? 1; } - const { success, message, remainingCredits } = await checkTeamCredits(req.auth.team_id, minimum); + const { success, remainingCredits, chunk } = await checkTeamCredits(req.acuc, req.auth.team_id, minimum); + req.acuc = chunk; if (!success) { Logger.error(`Insufficient credits: ${JSON.stringify({ team_id: req.auth.team_id, minimum, remainingCredits })}`); if (!res.headersSent) { return res.status(402).json({ success: false, error: "Insufficient credits" }); } } - req.account = { remainingCredits } + req.account = { remainingCredits }; next(); })() .catch(err => next(err)); @@ -47,7 +48,7 @@ function checkCreditsMiddleware(minimum?: number): (req: RequestWithAuth, res: R export function authMiddleware(rateLimiterMode: RateLimiterMode): (req: RequestWithMaybeAuth, res: Response, next: NextFunction) => void { return (req, res, next) => { (async () => { - const { success, team_id, error, status, plan } = await authenticateUser( + const { success, team_id, error, status, plan, chunk } = await authenticateUser( req, res, rateLimiterMode, @@ -60,6 +61,7 @@ export function authMiddleware(rateLimiterMode: RateLimiterMode): (req: RequestW } req.auth = { team_id, plan }; + req.acuc = chunk; next(); })() .catch(err => next(err)); diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 6a71b40a..12e6b170 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -6,6 +6,7 @@ import { Logger } from "../../lib/logger"; import { getValue, setValue } from "../redis"; import { redlock } from "../redlock"; import * as Sentry from "@sentry/node"; +import { AuthCreditUsageChunk } from "../../controllers/v1/types"; const FREE_CREDITS = 500; @@ -166,264 +167,42 @@ export async function supaBillTeam(team_id: string, credits: number) { }); } -export async function checkTeamCredits(team_id: string, credits: number) { - return withAuth(supaCheckTeamCredits)(team_id, credits); +export async function checkTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) { + return withAuth(supaCheckTeamCredits)(chunk, team_id, credits); } // if team has enough credits for the operation, return true, else return false -export async function supaCheckTeamCredits(team_id: string, credits: number) { +export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) { + // WARNING: chunk will be null if team_id is preview -- do not perform operations on it under ANY circumstances - mogery if (team_id === "preview") { return { success: true, message: "Preview team, no credits used", remainingCredits: Infinity }; } - - let cacheKeySubscription = `subscription_${team_id}`; - let cacheKeyCoupons = `coupons_${team_id}`; - - // Try to get data from cache first - const [cachedSubscription, cachedCoupons] = await Promise.all([ - getValue(cacheKeySubscription), - getValue(cacheKeyCoupons) - ]); - - let subscription, subscriptionError; - let coupons : {credits: number}[]; - - if (cachedSubscription && cachedCoupons) { - subscription = JSON.parse(cachedSubscription); - coupons = JSON.parse(cachedCoupons); - } else { - // If not in cache, retrieve from database - const [subscriptionResult, couponsResult] = await Promise.all([ - supabase_service - .from("subscriptions") - .select("id, price_id, current_period_start, current_period_end") - .eq("team_id", team_id) - .eq("status", "active") - .single(), - supabase_service - .from("coupons") - .select("credits") - .eq("team_id", team_id) - .eq("status", "active"), - ]); - - subscription = subscriptionResult.data; - subscriptionError = subscriptionResult.error; - coupons = couponsResult.data; - - // Cache the results for a minute, sub can be null and that's fine - await setValue(cacheKeySubscription, JSON.stringify(subscription), 60); // Cache for 1 minute, even if null - await setValue(cacheKeyCoupons, JSON.stringify(coupons), 60); // Cache for 1 minute - - } - - let couponCredits = 0; - if (coupons && coupons.length > 0) { - couponCredits = coupons.reduce( - (total, coupon) => total + coupon.credits, - 0 - ); - } - - - // If there are available coupons and they are enough for the operation - if (couponCredits >= credits) { - return { success: true, message: "Sufficient credits available", remainingCredits: couponCredits }; - } - - - // Free credits, no coupons - if (!subscription || subscriptionError) { - - let creditUsages; - let creditUsageError; - let totalCreditsUsed = 0; - const cacheKeyCreditUsage = `credit_usage_${team_id}`; - - // Try to get credit usage from cache - const cachedCreditUsage = await getValue(cacheKeyCreditUsage); - - if (cachedCreditUsage) { - totalCreditsUsed = parseInt(cachedCreditUsage); - } else { - let retries = 0; - const maxRetries = 3; - const retryInterval = 2000; // 2 seconds - - while (retries < maxRetries) { - // Reminder, this has an 1000 limit. - const result = await supabase_service - .from("credit_usage") - .select("credits_used") - .is("subscription_id", null) - .eq("team_id", team_id); - - creditUsages = result.data; - creditUsageError = result.error; - - if (!creditUsageError) { - break; - } - - retries++; - if (retries < maxRetries) { - await new Promise(resolve => setTimeout(resolve, retryInterval)); - } - } - - if (creditUsageError) { - Logger.error(`Credit usage error after ${maxRetries} attempts: ${creditUsageError}`); - throw new Error( - `Failed to retrieve credit usage for team_id: ${team_id}` - ); - } - - totalCreditsUsed = creditUsages.reduce( - (acc, usage) => acc + usage.credits_used, - 0 - ); - - // Cache the result for 30 seconds - await setValue(cacheKeyCreditUsage, totalCreditsUsed.toString(), 30); - } - - Logger.info(`totalCreditsUsed: ${totalCreditsUsed}`); - - const end = new Date(); - end.setDate(end.getDate() + 30); - // check if usage is within 80% of the limit - const creditLimit = FREE_CREDITS; - const creditUsagePercentage = totalCreditsUsed / creditLimit; - - // Add a check to ensure totalCreditsUsed is greater than 0 - if (totalCreditsUsed > 0 && creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) { - Logger.info(`Sending notification for team ${team_id}. Total credits used: ${totalCreditsUsed}, Credit usage percentage: ${creditUsagePercentage}`); - await sendNotification( - team_id, - NotificationType.APPROACHING_LIMIT, - new Date().toISOString(), - end.toISOString() - ); - } - - // 5. Compare the total credits used with the credits allowed by the plan. - if (totalCreditsUsed >= FREE_CREDITS) { - // Send email notification for insufficient credits - await sendNotification( - team_id, - NotificationType.LIMIT_REACHED, - new Date().toISOString(), - end.toISOString() - ); - return { - success: false, - message: "Insufficient credits, please upgrade!", - remainingCredits: FREE_CREDITS - totalCreditsUsed - }; - } - return { success: true, message: "Sufficient credits available", remainingCredits: FREE_CREDITS - totalCreditsUsed }; - } - - let totalCreditsUsed = 0; - const cacheKey = `credit_usage_${subscription.id}_${subscription.current_period_start}_${subscription.current_period_end}_lc`; - const redLockKey = `lock_${cacheKey}`; - const lockTTL = 10000; // 10 seconds - - try { - const lock = await redlock.acquire([redLockKey], lockTTL); - - try { - const cachedCreditUsage = await getValue(cacheKey); - - if (cachedCreditUsage) { - totalCreditsUsed = parseInt(cachedCreditUsage); - } else { - const { data: creditUsages, error: creditUsageError } = - await supabase_service.rpc("get_credit_usage_2", { - sub_id: subscription.id, - start_time: subscription.current_period_start, - end_time: subscription.current_period_end, - }); - - if (creditUsageError) { - Logger.error(`Error calculating credit usage: ${creditUsageError}`); - } - - if (creditUsages && creditUsages.length > 0) { - totalCreditsUsed = creditUsages[0].total_credits_used; - await setValue(cacheKey, totalCreditsUsed.toString(), 500); // Cache for 8 minutes - // Logger.info(`Cache set for credit usage: ${totalCreditsUsed}`); - } - } - } finally { - await lock.release(); - } - } catch (error) { - Logger.error(`Error acquiring lock or calculating credit usage: ${error}`); - } - - // Adjust total credits used by subtracting coupon value - const adjustedCreditsUsed = Math.max(0, totalCreditsUsed - couponCredits); - - // Get the price details from cache or database - const priceCacheKey = `price_${subscription.price_id}`; - let price : {credits: number}; - - try { - const cachedPrice = await getValue(priceCacheKey); - if (cachedPrice) { - price = JSON.parse(cachedPrice); - } else { - const { data, error: priceError } = await supabase_service - .from("prices") - .select("credits") - .eq("id", subscription.price_id) - .single(); - - if (priceError) { - throw new Error( - `Failed to retrieve price for price_id: ${subscription.price_id}` - ); - } - - price = data; - // There are only 21 records, so this is super fine - // Cache the price for a long time (e.g., 1 day) - await setValue(priceCacheKey, JSON.stringify(price), 86400); - } - } catch (error) { - Logger.error(`Error retrieving or caching price: ${error}`); - Sentry.captureException(error); - // If errors, just assume it's a big number so user don't get an error - price = { credits: 10000000 }; - } - - const creditLimit = price.credits; + const creditsWillBeUsed = chunk.adjusted_credits_used + credits; // Removal of + credits - const creditUsagePercentage = adjustedCreditsUsed / creditLimit; + const creditUsagePercentage = creditsWillBeUsed / chunk.price_credits; // Compare the adjusted total credits used with the credits allowed by the plan - if (adjustedCreditsUsed >= price.credits) { - await sendNotification( + if (creditsWillBeUsed >= chunk.price_credits) { + sendNotification( team_id, NotificationType.LIMIT_REACHED, - subscription.current_period_start, - subscription.current_period_end + chunk.sub_current_period_start, + chunk.sub_current_period_end ); - return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: creditLimit - adjustedCreditsUsed }; + return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk }; } else if (creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) { // Send email notification for approaching credit limit - await sendNotification( + sendNotification( team_id, NotificationType.APPROACHING_LIMIT, - subscription.current_period_start, - subscription.current_period_end + chunk.sub_current_period_start, + chunk.sub_current_period_end ); } - return { success: true, message: "Sufficient credits available", remainingCredits: creditLimit - adjustedCreditsUsed }; + return { success: true, message: "Sufficient credits available", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk }; } // Count the total credits used by a team within the current billing period and return the remaining credits. diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 50fb6eef..3795ce1e 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -1,3 +1,4 @@ +import { AuthCreditUsageChunk } from "./controllers/v1/types"; import { ExtractorOptions, Document, DocumentUrl } from "./lib/entities"; type Mode = "crawl" | "single_urls" | "sitemap"; @@ -120,6 +121,7 @@ export interface AuthResponse { status?: number; api_key?: string; plan?: PlanType; + chunk?: AuthCreditUsageChunk; } From 417adf8e967a79787a1d5f5fcf647829d87613a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 19:42:45 +0200 Subject: [PATCH 02/25] fix(db): fix caching and rpc error --- apps/api/src/controllers/auth.ts | 2 +- apps/api/src/lib/checkCredits.ts | 32 -------------------------------- 2 files changed, 1 insertion(+), 33 deletions(-) delete mode 100644 apps/api/src/lib/checkCredits.ts diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index a428f9a3..fe7baa62 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -64,7 +64,7 @@ async function getACUC(api_key: string): Promise { const cachedACUC = await getValue(cacheKeyACUC); if (cachedACUC !== null) { - return JSON.parse(cacheKeyACUC); + return JSON.parse(cachedACUC); } else { const { data, error } = await supabase_service.rpc("auth_credit_usage_chunk", { input_key: api_key }); diff --git a/apps/api/src/lib/checkCredits.ts b/apps/api/src/lib/checkCredits.ts deleted file mode 100644 index 7e9d988d..00000000 --- a/apps/api/src/lib/checkCredits.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { checkTeamCredits } from "../services/billing/credit_billing"; -import { Logger } from "./logger"; - -type checkCreditsResponse = { - status: number; - error: string | null; -} - -export const checkCredits = async (team_id: string): Promise => { - try { - const { - success: creditsCheckSuccess, - message: creditsCheckMessage - } = await checkTeamCredits(team_id, 1); - if (!creditsCheckSuccess) { - return { - status: 402, - error: "Insufficient credits" - }; - } - } catch (error) { - Logger.error(error); - return { - status: 500, - error: "Error checking team credits. Please contact hello@firecrawl.com for help." - }; - } - return { - status: 200, - error: null - } -}; \ No newline at end of file From 415fd9f33325af08f9a92abe23233e060db0f0a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 20:37:35 +0200 Subject: [PATCH 03/25] fix(credit_billing): return chunk.remaining_credits --- apps/api/src/services/billing/credit_billing.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 12e6b170..48dd3a17 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -191,7 +191,7 @@ export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: chunk.sub_current_period_start, chunk.sub_current_period_end ); - return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk }; + return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: chunk.remaining_credits, chunk }; } else if (creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) { // Send email notification for approaching credit limit sendNotification( @@ -202,7 +202,7 @@ export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: ); } - return { success: true, message: "Sufficient credits available", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk }; + return { success: true, message: "Sufficient credits available", remainingCredits: chunk.remaining_credits, chunk }; } // Count the total credits used by a team within the current billing period and return the remaining credits. From 5a8eb17a8200b2a323897cf8bf51229204dd39d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 20:57:45 +0200 Subject: [PATCH 04/25] feat(db): implement bill_team RPC --- apps/api/src/controllers/v0/scrape.ts | 2 +- apps/api/src/controllers/v0/search.ts | 4 +- apps/api/src/controllers/v1/map.ts | 2 +- apps/api/src/controllers/v1/scrape.ts | 2 +- apps/api/src/controllers/v1/types.ts | 8 +- apps/api/src/main/runWebScraper.ts | 2 +- .../src/services/billing/credit_billing.ts | 185 ++---------------- 7 files changed, 23 insertions(+), 182 deletions(-) diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 696f8f74..db304fed 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -244,7 +244,7 @@ export async function scrapeController(req: Request, res: Response) { } if (creditsToBeBilled > 0) { // billing for doc done on queue end, bill only for llm extraction - billTeam(team_id, creditsToBeBilled).catch(error => { + billTeam(team_id, chunk.sub_id, creditsToBeBilled).catch(error => { Logger.error(`Failed to bill team ${team_id} for ${creditsToBeBilled} credits: ${error}`); // Optionally, you could notify an admin or add to a retry queue here }); diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 970acc25..2d8c69de 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -18,6 +18,7 @@ export async function searchHelper( jobId: string, req: Request, team_id: string, + subscription_id: string, crawlerOptions: any, pageOptions: PageOptions, searchOptions: SearchOptions, @@ -54,7 +55,7 @@ export async function searchHelper( if (justSearch) { - billTeam(team_id, res.length).catch(error => { + billTeam(team_id, subscription_id, res.length).catch(error => { Logger.error(`Failed to bill team ${team_id} for ${res.length} credits: ${error}`); // Optionally, you could notify an admin or add to a retry queue here }); @@ -169,6 +170,7 @@ export async function searchController(req: Request, res: Response) { jobId, req, team_id, + chunk.sub_id, crawlerOptions, pageOptions, searchOptions, diff --git a/apps/api/src/controllers/v1/map.ts b/apps/api/src/controllers/v1/map.ts index 6b13f762..aca03ced 100644 --- a/apps/api/src/controllers/v1/map.ts +++ b/apps/api/src/controllers/v1/map.ts @@ -152,7 +152,7 @@ export async function mapController( // remove duplicates that could be due to http/https or www links = removeDuplicateUrls(links); - billTeam(req.auth.team_id, 1).catch((error) => { + billTeam(req.auth.team_id, req.acuc.sub_id, 1).catch((error) => { Logger.error( `Failed to bill team ${req.auth.team_id} for 1 credit: ${error}` ); diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index ebbabc00..41974917 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -108,7 +108,7 @@ export async function scrapeController( creditsToBeBilled = 5; } - billTeam(req.auth.team_id, creditsToBeBilled).catch(error => { + billTeam(req.auth.team_id, req.acuc.sub_id, creditsToBeBilled).catch(error => { Logger.error(`Failed to bill team ${req.auth.team_id} for ${creditsToBeBilled} credits: ${error}`); // Optionally, you could notify an admin or add to a retry queue here }); diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index c09a29f2..5e99fdab 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -318,10 +318,10 @@ type Account = { export type AuthCreditUsageChunk = { api_key: string; team_id: string; - sub_id: string; - sub_current_period_start: string; - sub_current_period_end: string; - price_id: string; + sub_id: string | null; + sub_current_period_start: string | null; + sub_current_period_end: string | null; + price_id: string | null; price_credits: number; // credit limit with assoicated price, or free_credits (500) if free plan credits_used: number; coupon_credits: number; diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index f67a1cd0..571122f9 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -120,7 +120,7 @@ export async function runWebScraper({ : docs; if(is_scrape === false) { - billTeam(team_id, filteredDocs.length).catch(error => { + billTeam(team_id, undefined, filteredDocs.length).catch(error => { Logger.error(`Failed to bill team ${team_id} for ${filteredDocs.length} credits: ${error}`); // Optionally, you could notify an admin or add to a retry queue here }); diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 48dd3a17..9d0d1f09 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -3,168 +3,30 @@ import { withAuth } from "../../lib/withAuth"; import { sendNotification } from "../notification/email_notification"; import { supabase_service } from "../supabase"; import { Logger } from "../../lib/logger"; -import { getValue, setValue } from "../redis"; -import { redlock } from "../redlock"; import * as Sentry from "@sentry/node"; import { AuthCreditUsageChunk } from "../../controllers/v1/types"; const FREE_CREDITS = 500; - -export async function billTeam(team_id: string, credits: number) { - return withAuth(supaBillTeam)(team_id, credits); +/** + * If you do not know the subscription_id in the current context, pass subscription_id as undefined. + */ +export async function billTeam(team_id: string, subscription_id: string | null | undefined, credits: number) { + return withAuth(supaBillTeam)(team_id, subscription_id, credits); } -export async function supaBillTeam(team_id: string, credits: number) { +export async function supaBillTeam(team_id: string, subscription_id: string, credits: number) { if (team_id === "preview") { return { success: true, message: "Preview team, no credits used" }; } Logger.info(`Billing team ${team_id} for ${credits} credits`); - // When the API is used, you can log the credit usage in the credit_usage table: - // team_id: The ID of the team using the API. - // subscription_id: The ID of the team's active subscription. - // credits_used: The number of credits consumed by the API call. - // created_at: The timestamp of the API usage. - // 1. get the subscription and check for available coupons concurrently - const [{ data: subscription }, { data: coupons }] = await Promise.all([ - supabase_service - .from("subscriptions") - .select("*") - .eq("team_id", team_id) - .eq("status", "active") - .single(), - supabase_service - .from("coupons") - .select("id, credits") - .eq("team_id", team_id) - .eq("status", "active"), - ]); - - let couponCredits = 0; - let sortedCoupons = []; - - if (coupons && coupons.length > 0) { - couponCredits = coupons.reduce( - (total, coupon) => total + coupon.credits, - 0 - ); - sortedCoupons = [...coupons].sort((a, b) => b.credits - a.credits); + const { error } = + await supabase_service.rpc("bill_team", { _team_id: team_id, sub_id: subscription_id ?? null, fetch_subscription: subscription_id === undefined, credits }); + + if (error) { + Sentry.captureException(error); + Logger.error("Failed to bill team: " + JSON.stringify(error)); } - // using coupon credits: - if (couponCredits > 0) { - // if there is no subscription and they have enough coupon credits - if (!subscription) { - // using only coupon credits: - // if there are enough coupon credits - if (couponCredits >= credits) { - // remove credits from coupon credits - let usedCredits = credits; - while (usedCredits > 0) { - // update coupons - if (sortedCoupons[0].credits < usedCredits) { - usedCredits = usedCredits - sortedCoupons[0].credits; - // update coupon credits - await supabase_service - .from("coupons") - .update({ - credits: 0, - }) - .eq("id", sortedCoupons[0].id); - sortedCoupons.shift(); - } else { - // update coupon credits - await supabase_service - .from("coupons") - .update({ - credits: sortedCoupons[0].credits - usedCredits, - }) - .eq("id", sortedCoupons[0].id); - usedCredits = 0; - } - } - - return await createCreditUsage({ team_id, credits: 0 }); - - // not enough coupon credits and no subscription - } else { - // update coupon credits - const usedCredits = credits - couponCredits; - for (let i = 0; i < sortedCoupons.length; i++) { - await supabase_service - .from("coupons") - .update({ - credits: 0, - }) - .eq("id", sortedCoupons[i].id); - } - - return await createCreditUsage({ team_id, credits: usedCredits }); - } - } - - // with subscription - // using coupon + subscription credits: - if (credits > couponCredits) { - // update coupon credits - for (let i = 0; i < sortedCoupons.length; i++) { - await supabase_service - .from("coupons") - .update({ - credits: 0, - }) - .eq("id", sortedCoupons[i].id); - } - const usedCredits = credits - couponCredits; - return await createCreditUsage({ - team_id, - subscription_id: subscription.id, - credits: usedCredits, - }); - } else { - // using only coupon credits - let usedCredits = credits; - while (usedCredits > 0) { - // update coupons - if (sortedCoupons[0].credits < usedCredits) { - usedCredits = usedCredits - sortedCoupons[0].credits; - // update coupon credits - await supabase_service - .from("coupons") - .update({ - credits: 0, - }) - .eq("id", sortedCoupons[0].id); - sortedCoupons.shift(); - } else { - // update coupon credits - await supabase_service - .from("coupons") - .update({ - credits: sortedCoupons[0].credits - usedCredits, - }) - .eq("id", sortedCoupons[0].id); - usedCredits = 0; - } - } - - return await createCreditUsage({ - team_id, - subscription_id: subscription.id, - credits: 0, - }); - } - } - - // not using coupon credits - if (!subscription) { - return await createCreditUsage({ team_id, credits }); - } - - return await createCreditUsage({ - team_id, - subscription_id: subscription.id, - credits, - }); } export async function checkTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) { @@ -297,26 +159,3 @@ export async function countCreditsAndRemainingForCurrentBillingPeriod( totalCredits: price.credits, }; } - -async function createCreditUsage({ - team_id, - subscription_id, - credits, -}: { - team_id: string; - subscription_id?: string; - credits: number; -}) { - await supabase_service - .from("credit_usage") - .insert([ - { - team_id, - credits_used: credits, - subscription_id: subscription_id || null, - created_at: new Date(), - }, - ]); - - return { success: true }; -} From e67cbc2ca175b8138f405a7a0e7b83f97c90b645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 21:37:01 +0200 Subject: [PATCH 05/25] fix(billTeam): update cached ACUC after billing --- apps/api/src/controllers/auth.ts | 8 +++++--- apps/api/src/controllers/v1/types.ts | 2 +- .../src/services/billing/credit_billing.ts | 19 ++++++++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index fe7baa62..96f94a99 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -37,7 +37,7 @@ function normalizedApiIsUuid(potentialUuid: string): boolean { return validate(potentialUuid); } -async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) { +export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) { const cacheKeyACUC = `acuc_${api_key}`; const redLockKey = `lock_${cacheKeyACUC}`; const lockTTL = 10000; // 10 seconds @@ -58,14 +58,14 @@ async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) { } } -async function getACUC(api_key: string): Promise { +export async function getACUC(api_key: string, cacheOnly = false): Promise { const cacheKeyACUC = `acuc_${api_key}`; const cachedACUC = await getValue(cacheKeyACUC); if (cachedACUC !== null) { return JSON.parse(cachedACUC); - } else { + } else if (!cacheOnly) { const { data, error } = await supabase_service.rpc("auth_credit_usage_chunk", { input_key: api_key }); @@ -85,6 +85,8 @@ async function getACUC(api_key: string): Promise { } return chunk; + } else { + return null; } } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 5e99fdab..441980b1 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -324,7 +324,7 @@ export type AuthCreditUsageChunk = { price_id: string | null; price_credits: number; // credit limit with assoicated price, or free_credits (500) if free plan credits_used: number; - coupon_credits: number; + coupon_credits: number; // do not rely on this number to be up to date after calling a billTeam coupons: any[]; adjusted_credits_used: number; // credits this period minus coupons used remaining_credits: number; diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 9d0d1f09..6fa34906 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -5,6 +5,7 @@ import { supabase_service } from "../supabase"; import { Logger } from "../../lib/logger"; import * as Sentry from "@sentry/node"; import { AuthCreditUsageChunk } from "../../controllers/v1/types"; +import { getACUC, setCachedACUC } from "../../controllers/auth"; const FREE_CREDITS = 500; @@ -20,13 +21,29 @@ export async function supaBillTeam(team_id: string, subscription_id: string, cre } Logger.info(`Billing team ${team_id} for ${credits} credits`); - const { error } = + const { data, error } = await supabase_service.rpc("bill_team", { _team_id: team_id, sub_id: subscription_id ?? null, fetch_subscription: subscription_id === undefined, credits }); if (error) { Sentry.captureException(error); Logger.error("Failed to bill team: " + JSON.stringify(error)); + return; } + + (async () => { + for (const apiKey of (data ?? []).map(x => x.api_key)) { + const acuc = await getACUC(apiKey, true); + + if (acuc !== null) { + await setCachedACUC(apiKey, { + ...acuc, + credits_used: acuc.credits_used + credits, + adjusted_credits_used: acuc.adjusted_credits_used + credits, + remaining_credits: acuc.remaining_credits - credits, + }); + } + } + })(); } export async function checkTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) { From eb7317c08a824eaa5de09f8b456afcf831bbb947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 21:44:05 +0200 Subject: [PATCH 06/25] fix(credit_billing): allow spending of exact credits --- apps/api/src/services/billing/credit_billing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 6fa34906..56de74e5 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -63,7 +63,7 @@ export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: const creditUsagePercentage = creditsWillBeUsed / chunk.price_credits; // Compare the adjusted total credits used with the credits allowed by the plan - if (creditsWillBeUsed >= chunk.price_credits) { + if (creditsWillBeUsed > chunk.price_credits) { sendNotification( team_id, NotificationType.LIMIT_REACHED, From 1cca9b8ae6e4ab6a1b9de016f044a75e4f810884 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 22:15:02 +0200 Subject: [PATCH 07/25] fix(billTeam): cache update race condition --- apps/api/src/controllers/auth.ts | 11 ++++++++++- apps/api/src/services/billing/credit_billing.ts | 16 ++++++---------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 96f94a99..2298430e 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -37,7 +37,7 @@ function normalizedApiIsUuid(potentialUuid: string): boolean { return validate(potentialUuid); } -export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) { +export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk | ((acuc: AuthCreditUsageChunk) => AuthCreditUsageChunk)) { const cacheKeyACUC = `acuc_${api_key}`; const redLockKey = `lock_${cacheKeyACUC}`; const lockTTL = 10000; // 10 seconds @@ -46,6 +46,15 @@ export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) const lock = await redlock.acquire([redLockKey], lockTTL); try { + if (typeof acuc === "function") { + acuc = acuc(JSON.parse(await getValue(cacheKeyACUC))); + + if (acuc === null) { + await lock.release(); + return; + } + } + // Cache for 10 minutes. This means that changing subscription tier could have // a maximum of 10 minutes of a delay. - mogery await setValue(cacheKeyACUC, JSON.stringify(acuc), 600); diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 56de74e5..56b02dad 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -32,16 +32,12 @@ export async function supaBillTeam(team_id: string, subscription_id: string, cre (async () => { for (const apiKey of (data ?? []).map(x => x.api_key)) { - const acuc = await getACUC(apiKey, true); - - if (acuc !== null) { - await setCachedACUC(apiKey, { - ...acuc, - credits_used: acuc.credits_used + credits, - adjusted_credits_used: acuc.adjusted_credits_used + credits, - remaining_credits: acuc.remaining_credits - credits, - }); - } + await setCachedACUC(apiKey, acuc => (acuc ? { + ...acuc, + credits_used: acuc.credits_used + credits, + adjusted_credits_used: acuc.adjusted_credits_used + credits, + remaining_credits: acuc.remaining_credits - credits, + } : null)); } })(); } From 2c96d2eef666167f8d486c69277f2e5e0bb0ea1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 22:25:13 +0200 Subject: [PATCH 08/25] fix(auth/redlock): retry cached ACUC lock for 20 seconds --- apps/api/src/controllers/auth.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 2298430e..73cb3d2a 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -43,7 +43,10 @@ export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk const lockTTL = 10000; // 10 seconds try { - const lock = await redlock.acquire([redLockKey], lockTTL); + const lock = await redlock.acquire([redLockKey], lockTTL, { + retryCount: 200, + retryDelay: 100, + }); try { if (typeof acuc === "function") { From eef116bef800192a69a1623bc9da1cb5d1bff843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 22:27:51 +0200 Subject: [PATCH 09/25] fix(auth): move redlock settings --- apps/api/src/controllers/auth.ts | 5 +---- apps/api/src/services/redlock.ts | 9 +++------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 73cb3d2a..2298430e 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -43,10 +43,7 @@ export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk const lockTTL = 10000; // 10 seconds try { - const lock = await redlock.acquire([redLockKey], lockTTL, { - retryCount: 200, - retryDelay: 100, - }); + const lock = await redlock.acquire([redLockKey], lockTTL); try { if (typeof acuc === "function") { diff --git a/apps/api/src/services/redlock.ts b/apps/api/src/services/redlock.ts index 9cbfc1fc..cb275736 100644 --- a/apps/api/src/services/redlock.ts +++ b/apps/api/src/services/redlock.ts @@ -10,12 +10,9 @@ export const redlock = new Redlock( // http://redis.io/topics/distlock driftFactor: 0.01, // multiplied by lock ttl to determine drift time - // The max number of times Redlock will attempt to lock a resource - // before erroring. - retryCount: 5, - - // the time in ms between attempts - retryDelay: 100, // time in ms + retryCount: 200, + + retryDelay: 100, // the max time in ms randomly added to retries // to improve performance under high contention From 953d4fb197ad16c0edfd6534443f8c379907c9b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 25 Sep 2024 22:47:42 +0200 Subject: [PATCH 10/25] fix(redlock): use redlock.using for stability --- apps/api/src/controllers/auth.ts | 18 ++++++++++-------- apps/api/src/services/redlock.ts | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 2298430e..91bfef1f 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -40,27 +40,29 @@ function normalizedApiIsUuid(potentialUuid: string): boolean { export async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk | ((acuc: AuthCreditUsageChunk) => AuthCreditUsageChunk)) { const cacheKeyACUC = `acuc_${api_key}`; const redLockKey = `lock_${cacheKeyACUC}`; - const lockTTL = 10000; // 10 seconds try { - const lock = await redlock.acquire([redLockKey], lockTTL); - - try { + await redlock.using([redLockKey], 10000, {}, async signal => { if (typeof acuc === "function") { acuc = acuc(JSON.parse(await getValue(cacheKeyACUC))); if (acuc === null) { - await lock.release(); + if (signal.aborted) { + throw signal.error; + } + return; } } + if (signal.aborted) { + throw signal.error; + } + // Cache for 10 minutes. This means that changing subscription tier could have // a maximum of 10 minutes of a delay. - mogery await setValue(cacheKeyACUC, JSON.stringify(acuc), 600); - } finally { - await lock.release(); - } + }); } catch (error) { Logger.error(`Error updating cached ACUC: ${error}`); Sentry.captureException(error); diff --git a/apps/api/src/services/redlock.ts b/apps/api/src/services/redlock.ts index cb275736..4ece058a 100644 --- a/apps/api/src/services/redlock.ts +++ b/apps/api/src/services/redlock.ts @@ -11,7 +11,7 @@ export const redlock = new Redlock( driftFactor: 0.01, // multiplied by lock ttl to determine drift time retryCount: 200, - + retryDelay: 100, // the max time in ms randomly added to retries From 30058b1da097a124f6517719c44abfe0467dd569 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 25 Sep 2024 19:27:02 -0400 Subject: [PATCH 11/25] Nick: increased timeout for chrome-cdp due to smart wait --- apps/api/src/scraper/WebScraper/scrapers/fireEngine.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/api/src/scraper/WebScraper/scrapers/fireEngine.ts b/apps/api/src/scraper/WebScraper/scrapers/fireEngine.ts index 295c171c..dd3577b0 100644 --- a/apps/api/src/scraper/WebScraper/scrapers/fireEngine.ts +++ b/apps/api/src/scraper/WebScraper/scrapers/fireEngine.ts @@ -131,7 +131,9 @@ export async function scrapWithFireEngine({ const waitTotal = (actions ?? []).filter(x => x.type === "wait").reduce((a, x) => (x as { type: "wait"; milliseconds: number; }).milliseconds + a, 0); let checkStatusResponse = await axiosInstance.get(`${process.env.FIRE_ENGINE_BETA_URL}/scrape/${_response.data.jobId}`); - while (checkStatusResponse.data.processing && Date.now() - startTime < universalTimeout + waitTotal) { + + // added 5 seconds to the timeout to account for 'smart wait' + while (checkStatusResponse.data.processing && Date.now() - startTime < universalTimeout + waitTotal + 5000) { await new Promise(resolve => setTimeout(resolve, 250)); // wait 0.25 seconds checkStatusResponse = await axiosInstance.get(`${process.env.FIRE_ENGINE_BETA_URL}/scrape/${_response.data.jobId}`); } From 53fce67ca1f17d94c1f5309ac4aa69e460598fcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 20:23:13 +0200 Subject: [PATCH 12/25] feat(queue-worker): PoC of concurrency limits --- apps/api/src/services/queue-worker.ts | 46 +++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 37e14baf..76796742 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from "@hyperdx/node-opentelemetry"; -import { Job } from "bullmq"; +import { Job, Queue } from "bullmq"; import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; @@ -99,10 +99,10 @@ process.on("SIGINT", () => { }); const workerFun = async ( - queueName: string, + queue: Queue, processJobInternal: (token: string, job: Job) => Promise ) => { - const worker = new Worker(queueName, null, { + const worker = new Worker(queue.name, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute // lockRenewTime: 15 * 1000, // 15 seconds @@ -129,6 +129,29 @@ const workerFun = async ( const job = await worker.getNextJob(token); if (job) { + const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; + + if (job.data && job.data.team_id) { + const concurrencyLimit = 100; // TODO: determine based on price id + const now = Date.now(); + const stalledJobTimeoutMs = 2 * 60 * 1000; + + redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); + const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); + if (activeJobsOfTeam.length >= concurrencyLimit) { + Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); + // Concurrency limit hit + await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + await queue.add(job.name, job.data, { + ...job.opts, + jobId: job.id, + priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs + }) + } else { + await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + } + } + if (job.data && job.data.sentry && Sentry.isInitialized()) { Sentry.continueTrace( { @@ -159,7 +182,15 @@ const workerFun = async ( }, }, async () => { - const res = await processJobInternal(token, job); + let res; + try { + res = await processJobInternal(token, job); + } finally { + if (job.id && job.data && job.data.team_id) { + await redisConnection.zrem(concurrencyLimiterKey, job.id); + } + } + if (res !== null) { span.setStatus({ code: 2 }); // ERROR } else { @@ -181,7 +212,10 @@ const workerFun = async ( }, }, () => { - processJobInternal(token, job); + processJobInternal(token, job) + .finally(() => { + + }); } ); } @@ -193,7 +227,7 @@ const workerFun = async ( } }; -workerFun(scrapeQueueName, processJobInternal); +workerFun(getScrapeQueue(), processJobInternal); async function processJob(job: Job, token: string) { Logger.info(`🐂 Worker taking job ${job.id}`); From d2881927c18cddef767f043810532f887fd583a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 20:29:17 +0200 Subject: [PATCH 13/25] fix(queue-worker): remove concurrency entries when done in sentry-less branch --- apps/api/src/services/queue-worker.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 76796742..71bd6366 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -214,7 +214,9 @@ const workerFun = async ( () => { processJobInternal(token, job) .finally(() => { - + if (job.id && job.data && job.data.team_id) { + redisConnection.zrem(concurrencyLimiterKey, job.id); + } }); } ); From dec4171937035f5db6733e22766b7d238224bd9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 20:39:19 +0200 Subject: [PATCH 14/25] fix(queue-worker, queue-jobs): logic fixes --- apps/api/src/services/queue-jobs.ts | 7 +++++-- apps/api/src/services/queue-worker.ts | 8 ++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 7a698772..746d4b97 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -63,8 +63,11 @@ export function waitForJob(jobId: string, timeout: number) { resolve((await getScrapeQueue().getJob(jobId)).returnvalue); } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); - clearInterval(int); - reject((await getScrapeQueue().getJob(jobId)).failedReason); + const job = await getScrapeQueue().getJob(jobId); + if (job.failedReason !== "Concurrency limit hit") { + clearInterval(int); + reject(job.failedReason); + } } } }, 500); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 71bd6366..ed13cab7 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -132,7 +132,7 @@ const workerFun = async ( const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; if (job.data && job.data.team_id) { - const concurrencyLimit = 100; // TODO: determine based on price id + const concurrencyLimit = 10; // TODO: determine based on price id const now = Date.now(); const stalledJobTimeoutMs = 2 * 60 * 1000; @@ -142,11 +142,15 @@ const workerFun = async ( Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); // Concurrency limit hit await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + await job.remove(); await queue.add(job.name, job.data, { ...job.opts, jobId: job.id, priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs - }) + }); + + await sleep(gotJobInterval); + continue; } else { await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); } From b696bfc8545ce0b6f01b21a189a69dfd8fad489b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 21:00:27 +0200 Subject: [PATCH 15/25] fix(crawl-status): avoid race conditions where crawl may be deemed failed --- apps/api/src/controllers/v0/crawl-status.ts | 4 ++-- apps/api/src/controllers/v1/crawl-status-ws.ts | 8 +++++--- apps/api/src/controllers/v1/crawl-status.ts | 8 +++++--- apps/api/src/lib/crawl-redis.ts | 4 ++++ apps/api/src/services/queue-worker.ts | 15 +++++++++++++-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/apps/api/src/controllers/v0/crawl-status.ts b/apps/api/src/controllers/v0/crawl-status.ts index 41491f86..1b1ffdc5 100644 --- a/apps/api/src/controllers/v0/crawl-status.ts +++ b/apps/api/src/controllers/v0/crawl-status.ts @@ -54,9 +54,9 @@ export async function crawlStatusController(req: Request, res: Response) { const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active"; - const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); + const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); if ( jobs.length > 0 && diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 16a67682..9832a948 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -5,7 +5,7 @@ import { CrawlStatusParams, CrawlStatusResponse, Document, ErrorResponse, legacy import { WebSocket } from "ws"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../lib/logger"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; import * as Sentry from "@sentry/node"; @@ -95,8 +95,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobs = await getJobs(doneJobIDs); const data = doneJobs.map(x => x.returnvalue); diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index 2ee0638c..df36f894 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -1,6 +1,6 @@ import { Response } from "express"; import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs"; import { configDotenv } from "dotenv"; @@ -58,8 +58,10 @@ export async function crawlStatusController(req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId); const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 9240018e..5b1ee77d 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -82,6 +82,10 @@ export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } +export async function getThrottledJobs(teamId: string): Promise { + return await redisConnection.zrangebyscore("concurrency-limiter:" + teamId + ":throttled", Date.now(), Infinity); +} + export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise { if (typeof sc.crawlerOptions?.limit === "number") { if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index ed13cab7..7468a050 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -132,18 +132,25 @@ const workerFun = async ( const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; if (job.data && job.data.team_id) { + const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled"; const concurrencyLimit = 10; // TODO: determine based on price id const now = Date.now(); const stalledJobTimeoutMs = 2 * 60 * 1000; + const throttledJobTimeoutMs = 10 * 60 * 1000; + redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now); redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); if (activeJobsOfTeam.length >= concurrencyLimit) { Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); // Concurrency limit hit + await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); await job.moveToFailed(new Error("Concurrency limit hit"), token, false); await job.remove(); - await queue.add(job.name, job.data, { + await queue.add(job.name, { + ...job.data, + concurrencyLimitHit: true, + }, { ...job.opts, jobId: job.id, priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs @@ -153,6 +160,7 @@ const workerFun = async ( continue; } else { await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); } } @@ -294,7 +302,10 @@ async function processJob(job: Job, token: string) { }, project_id: job.data.project_id, error: message /* etc... */, - docs, + docs: job.data.concurrencyLimitHit ? docs.map(x => ({ + ...x, + warning: "This scrape was throttled because you hit you concurrency limit." + (x.warning ? " " + x.warning : ""), + })) : docs, }; // No idea what this does and when it is called. From 095babe70b226ad9c697f340bb1d38a6ca4bd945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 21:18:56 +0200 Subject: [PATCH 16/25] fix(queue-jobs): jobs with concurrency fails may vanish --- apps/api/src/services/queue-jobs.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 746d4b97..315700a1 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -64,7 +64,7 @@ export function waitForJob(jobId: string, timeout: number) { } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); const job = await getScrapeQueue().getJob(jobId); - if (job.failedReason !== "Concurrency limit hit") { + if (job && job.failedReason !== "Concurrency limit hit") { clearInterval(int); reject(job.failedReason); } From 29815e084b3124f11d4560e44ddbb69605b48d24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 21:19:05 +0200 Subject: [PATCH 17/25] feat(v1/Document): add warning field --- apps/api/src/controllers/v1/types.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 441980b1..9b102e67 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -216,6 +216,7 @@ export type Document = { actions?: { screenshots: string[]; }; + warning?: string; metadata: { title?: string; description?: string; @@ -443,6 +444,7 @@ export function legacyDocumentConverter(doc: any): Document { extract: doc.llm_extraction, screenshot: doc.screenshot ?? doc.fullPageScreenshot, actions: doc.actions ?? undefined, + warning: doc.warning, metadata: { ...doc.metadata, pageError: undefined, From 3621e191bd7e9568e7668b16ad5d7aa6a9ce444f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sat, 28 Sep 2024 00:19:46 +0200 Subject: [PATCH 18/25] feat(concurrency-limit): set limit based on plan --- apps/api/src/controllers/v0/crawl.ts | 6 ++++-- apps/api/src/controllers/v0/crawlPreview.ts | 6 ++++-- apps/api/src/controllers/v0/scrape.ts | 1 + apps/api/src/controllers/v1/crawl.ts | 2 ++ apps/api/src/controllers/v1/scrape.ts | 1 + apps/api/src/services/queue-worker.ts | 8 ++++--- apps/api/src/services/rate-limiter.ts | 24 +++++++++++++++------ apps/api/src/types.ts | 1 + 8 files changed, 36 insertions(+), 13 deletions(-) diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index a95c85a6..3ebee976 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -171,7 +171,8 @@ export async function crawlController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: req.body.origin ?? defaultOrigin, crawl_id: id, @@ -211,7 +212,8 @@ export async function crawlController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: req.body.origin ?? defaultOrigin, crawl_id: id, diff --git a/apps/api/src/controllers/v0/crawlPreview.ts b/apps/api/src/controllers/v0/crawlPreview.ts index f8706867..bceb1df9 100644 --- a/apps/api/src/controllers/v0/crawlPreview.ts +++ b/apps/api/src/controllers/v0/crawlPreview.ts @@ -107,7 +107,8 @@ export async function crawlPreviewController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: "website-preview", crawl_id: id, @@ -121,7 +122,8 @@ export async function crawlPreviewController(req: Request, res: Response) { url, mode: "single_urls", crawlerOptions: crawlerOptions, - team_id: team_id, + team_id, + plan, pageOptions: pageOptions, origin: "website-preview", crawl_id: id, diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index db304fed..cfb1366c 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -60,6 +60,7 @@ export async function scrapeHelper( mode: "single_urls", crawlerOptions, team_id, + plan, pageOptions, extractorOptions, origin: req.body.origin ?? defaultOrigin, diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index e0883fa8..4efe279a 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -106,6 +106,7 @@ export async function crawlController( url, mode: "single_urls", team_id: req.auth.team_id, + plan: req.auth.plan, crawlerOptions, pageOptions, origin: "api", @@ -138,6 +139,7 @@ export async function crawlController( mode: "single_urls", crawlerOptions: crawlerOptions, team_id: req.auth.team_id, + plan: req.auth.plan, pageOptions: pageOptions, origin: "api", crawl_id: id, diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index 41974917..75384876 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -44,6 +44,7 @@ export async function scrapeController( mode: "single_urls", crawlerOptions: {}, team_id: req.auth.team_id, + plan: req.auth.plan, pageOptions, extractorOptions, origin: req.body.origin, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 7468a050..050d672d 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -34,9 +34,10 @@ import { deleteJobPriority, getJobPriority, } from "../../src/lib/job-priority"; -import { PlanType } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { getJobs } from "../../src/controllers/v1/crawl-status"; import { configDotenv } from "dotenv"; +import { getRateLimiterPoints } from "./rate-limiter"; configDotenv(); if (process.env.ENV === "production") { @@ -131,9 +132,9 @@ const workerFun = async ( if (job) { const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; - if (job.data && job.data.team_id) { + if (job.data && job.data.team_id && job.data.plan) { const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled"; - const concurrencyLimit = 10; // TODO: determine based on price id + const concurrencyLimit = getRateLimiterPoints(RateLimiterMode.Scrape, undefined, job.data.plan); const now = Date.now(); const stalledJobTimeoutMs = 2 * 60 * 1000; const throttledJobTimeoutMs = 10 * 60 * 1000; @@ -382,6 +383,7 @@ async function processJob(job: Job, token: string) { mode: "single_urls", crawlerOptions: sc.crawlerOptions, team_id: sc.team_id, + plan: job.data.plan, pageOptions: sc.pageOptions, origin: job.data.origin, crawl_id: job.data.crawl_id, diff --git a/apps/api/src/services/rate-limiter.ts b/apps/api/src/services/rate-limiter.ts index 51a0ecfa..e0fc5646 100644 --- a/apps/api/src/services/rate-limiter.ts +++ b/apps/api/src/services/rate-limiter.ts @@ -123,14 +123,18 @@ const testSuiteTokens = ["a01ccae", "6254cf9", "0f96e673", "23befa1b", "69141c4" const manual = ["69be9e74-7624-4990-b20d-08e0acc70cf6"]; -export function getRateLimiter( +function makePlanKey(plan?: string) { + return plan ? plan.replace("-", "") : "default"; // "default" +} + +export function getRateLimiterPoints( mode: RateLimiterMode, - token: string, + token?: string, plan?: string, teamId?: string ) { - if (testSuiteTokens.some(testToken => token.includes(testToken))) { + if (token && testSuiteTokens.some(testToken => token.includes(testToken))) { return testSuiteRateLimiter; } @@ -146,9 +150,17 @@ export function getRateLimiter( if (!rateLimitConfig) return serverRateLimiter; - const planKey = plan ? plan.replace("-", "") : "default"; // "default" const points = - rateLimitConfig[planKey] || rateLimitConfig.default || rateLimitConfig; // 5 + rateLimitConfig[makePlanKey(plan)] || rateLimitConfig.default || rateLimitConfig; // 5 - return createRateLimiter(`${mode}-${planKey}`, points); + return points; +} + +export function getRateLimiter( + mode: RateLimiterMode, + token?: string, + plan?: string, + teamId?: string +) { + return createRateLimiter(`${mode}-${makePlanKey(plan)}`, getRateLimiterPoints(mode, token, plan, teamId)); } diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 3795ce1e..a03176da 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -28,6 +28,7 @@ export interface WebScraperOptions { pageOptions: any; extractorOptions?: any; team_id: string; + plan: string; origin?: string; crawl_id?: string; sitemapped?: boolean; From 92dbd33e57f0a26fc8cdc8fde400802b9521a97d Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 14:53:26 -0300 Subject: [PATCH 19/25] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 050d672d..b18f372b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -148,6 +148,11 @@ const workerFun = async ( await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); await job.moveToFailed(new Error("Concurrency limit hit"), token, false); await job.remove(); + let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.01); + // max priority is 200k, limit is 2 million + if(newJobPriority > 200000) { + newJobPriority = 200000; + } await queue.add(job.name, { ...job.data, concurrencyLimitHit: true, From 8aa07afb6d7c95b2241f9afadf18c11da04a1e4c Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 15:15:49 -0300 Subject: [PATCH 20/25] Nick: fixes --- apps/api/src/controllers/v0/scrape.ts | 2 +- apps/api/src/scraper/WebScraper/sitemap.ts | 2 +- apps/api/src/services/queue-worker.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 987e877c..f5dbc3d1 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -245,7 +245,7 @@ export async function scrapeController(req: Request, res: Response) { } if (creditsToBeBilled > 0) { // billing for doc done on queue end, bill only for llm extraction - billTeam(team_id, chunk.sub_id, creditsToBeBilled).catch(error => { + billTeam(team_id, chunk?.sub_id, creditsToBeBilled).catch(error => { Logger.error(`Failed to bill team ${team_id} for ${creditsToBeBilled} credits: ${error}`); // Optionally, you could notify an admin or add to a retry queue here }); diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index 13dfc26e..d93403c8 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -23,7 +23,7 @@ export async function getLinksFromSitemap( const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); content = response.data; } else if (mode === 'fire-engine') { - const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"tlsclient", disableJsDom: true, mobileProxy: true } }); + const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"chrome-cdp" } }); content = response.html; } } catch (error) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index b18f372b..aaad7f52 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -159,7 +159,7 @@ const workerFun = async ( }, { ...job.opts, jobId: job.id, - priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs + priority: newJobPriority, // exponential backoff for stuck jobs }); await sleep(gotJobInterval); From 37299fc0359dd6064c2f972b7b5e80e80f8edd2f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 15:18:11 -0300 Subject: [PATCH 21/25] Update types.ts --- apps/api/src/controllers/v1/types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index d1db3627..45db51b5 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -445,7 +445,7 @@ export function legacyDocumentConverter(doc: any): Document { extract: doc.llm_extraction, screenshot: doc.screenshot ?? doc.fullPageScreenshot, actions: doc.actions ?? undefined, - warning: doc.warning, + warning: doc.warning ?? undefined, metadata: { ...doc.metadata, pageError: undefined, From c0541cc990d523b17bdbbe1cdf1213ce3a00364e Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 15:38:24 -0300 Subject: [PATCH 22/25] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index aaad7f52..0aa28cba 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -143,16 +143,21 @@ const workerFun = async ( redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); if (activeJobsOfTeam.length >= concurrencyLimit) { - Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); - // Concurrency limit hit + // Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue + // Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); + // Concurrency limit hit, throttles the job await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); + // We move to failed with a specific error await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + // Remove the job from the queue await job.remove(); - let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.01); - // max priority is 200k, limit is 2 million + // Increment the priority of the job exponentially by 5% + let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.05); + // Max priority is 200k, limit is 2 million if(newJobPriority > 200000) { newJobPriority = 200000; } + // Add the job back to the queue with the new priority await queue.add(job.name, { ...job.data, concurrencyLimitHit: true, @@ -165,7 +170,9 @@ const workerFun = async ( await sleep(gotJobInterval); continue; } else { + // If we are not throttled, add the job back to the queue with the new priority await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + // Remove the job from the throttled list await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); } } From 18f9cd09e17590f4678def589489c92907901f51 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 16:04:39 -0300 Subject: [PATCH 23/25] Nick: fixed more stuff --- .../src/controllers/v1/concurrency-check.ts | 25 ++++++++++++++ apps/api/src/controllers/v1/types.ts | 11 ++++++ apps/api/src/routes/v1.ts | 9 +++++ apps/api/src/services/rate-limiter.test.ts | 10 +++--- apps/api/src/services/rate-limiter.ts | 34 +++++++++---------- 5 files changed, 66 insertions(+), 23 deletions(-) create mode 100644 apps/api/src/controllers/v1/concurrency-check.ts diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts new file mode 100644 index 00000000..6ed4fa55 --- /dev/null +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -0,0 +1,25 @@ +import { authenticateUser } from "../auth"; +import { + ConcurrencyCheckParams, + ConcurrencyCheckResponse, + RequestWithAuth, +} from "./types"; +import { RateLimiterMode } from "../../types"; +import { Response } from "express"; +import { redisConnection } from "../../services/queue-service"; +// Basically just middleware and error wrapping +export async function concurrencyCheckController( + req: RequestWithAuth, + res: Response +) { + const concurrencyLimiterKey = "concurrency-limiter:" + req.params.teamId; + const now = Date.now(); + const activeJobsOfTeam = await redisConnection.zrangebyscore( + concurrencyLimiterKey, + now, + Infinity + ); + return res + .status(200) + .json({ success: true, concurrency: activeJobsOfTeam.length }); +} diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 45db51b5..3781eb78 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -294,6 +294,17 @@ export type CrawlStatusParams = { jobId: string; }; +export type ConcurrencyCheckParams = { + teamId: string; +}; + +export type ConcurrencyCheckResponse = + | ErrorResponse + | { + success: true; + concurrency: number; + }; + export type CrawlStatusResponse = | ErrorResponse | { diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index 49a41ce7..b0ceceb4 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -16,6 +16,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { crawlCancelController } from "../controllers/v1/crawl-cancel"; import { Logger } from "../lib/logger"; import { scrapeStatusController } from "../controllers/v1/scrape-status"; +import { concurrencyCheckController } from "../controllers/v1/concurrency-check"; // import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview"; // import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status"; // import { searchController } from "../../src/controllers/v1/search"; @@ -140,11 +141,19 @@ v1Router.get( wrap(scrapeStatusController) ); +v1Router.get( + "/concurrency-check", + authMiddleware(RateLimiterMode.CrawlStatus), + wrap(concurrencyCheckController) +); + v1Router.ws( "/crawl/:jobId", crawlStatusWSController ); + + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); diff --git a/apps/api/src/services/rate-limiter.test.ts b/apps/api/src/services/rate-limiter.test.ts index 3e252301..ba4a0a73 100644 --- a/apps/api/src/services/rate-limiter.test.ts +++ b/apps/api/src/services/rate-limiter.test.ts @@ -49,7 +49,7 @@ describe("Rate Limiter Service", () => { "nonexistent" as RateLimiterMode, "test-prefix:someToken" ); - expect(limiter).toBe(serverRateLimiter); + expect(limiter.points).toBe(serverRateLimiter.points); }); it("should return the correct rate limiter based on mode and plan", () => { @@ -210,7 +210,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(3); + expect(limiter2.points).toBe(10); const limiter3 = getRateLimiter( "crawl" as RateLimiterMode, @@ -233,7 +233,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(20); + expect(limiter2.points).toBe(100); const limiter3 = getRateLimiter( "scrape" as RateLimiterMode, @@ -263,14 +263,14 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "starter" ); - expect(limiter2.points).toBe(20); + expect(limiter2.points).toBe(50); const limiter3 = getRateLimiter( "search" as RateLimiterMode, "test-prefix:someToken", "standard" ); - expect(limiter3.points).toBe(40); + expect(limiter3.points).toBe(50); }); it("should return the correct rate limiter for 'preview' mode", () => { diff --git a/apps/api/src/services/rate-limiter.ts b/apps/api/src/services/rate-limiter.ts index e0fc5646..21e05948 100644 --- a/apps/api/src/services/rate-limiter.ts +++ b/apps/api/src/services/rate-limiter.ts @@ -132,8 +132,22 @@ export function getRateLimiterPoints( token?: string, plan?: string, teamId?: string -) { +) : number { + const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5} + + if (!rateLimitConfig) return RATE_LIMITS.account.default; + const points : number = + rateLimitConfig[makePlanKey(plan)] || rateLimitConfig.default; // 5 + return points; +} + +export function getRateLimiter( + mode: RateLimiterMode, + token?: string, + plan?: string, + teamId?: string + ) : RateLimiterRedis { if (token && testSuiteTokens.some(testToken => token.includes(testToken))) { return testSuiteRateLimiter; } @@ -145,22 +159,6 @@ export function getRateLimiterPoints( if(teamId && manual.includes(teamId)) { return manualRateLimiter; } - - const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5} - - if (!rateLimitConfig) return serverRateLimiter; - - const points = - rateLimitConfig[makePlanKey(plan)] || rateLimitConfig.default || rateLimitConfig; // 5 - - return points; -} - -export function getRateLimiter( - mode: RateLimiterMode, - token?: string, - plan?: string, - teamId?: string -) { + return createRateLimiter(`${mode}-${makePlanKey(plan)}`, getRateLimiterPoints(mode, token, plan, teamId)); } From c6717fecaa0156201219b18410d0513495fa84cd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 16:11:12 -0300 Subject: [PATCH 24/25] Nick: got rid of job interval sleep and math.min --- apps/api/src/controllers/v1/concurrency-check.ts | 2 +- apps/api/src/services/queue-worker.ts | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts index 6ed4fa55..8695c6e6 100644 --- a/apps/api/src/controllers/v1/concurrency-check.ts +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -12,7 +12,7 @@ export async function concurrencyCheckController( req: RequestWithAuth, res: Response ) { - const concurrencyLimiterKey = "concurrency-limiter:" + req.params.teamId; + const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id; const now = Date.now(); const activeJobsOfTeam = await redisConnection.zrangebyscore( concurrencyLimiterKey, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 0aa28cba..532e8fee 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -151,12 +151,8 @@ const workerFun = async ( await job.moveToFailed(new Error("Concurrency limit hit"), token, false); // Remove the job from the queue await job.remove(); - // Increment the priority of the job exponentially by 5% - let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.05); - // Max priority is 200k, limit is 2 million - if(newJobPriority > 200000) { - newJobPriority = 200000; - } + // Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million + const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000); // Add the job back to the queue with the new priority await queue.add(job.name, { ...job.data, @@ -167,7 +163,7 @@ const workerFun = async ( priority: newJobPriority, // exponential backoff for stuck jobs }); - await sleep(gotJobInterval); + // await sleep(gotJobInterval); continue; } else { // If we are not throttled, add the job back to the queue with the new priority From ac5e1fc194bd343c63578b830dc3fa6fc8543a31 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 1 Oct 2024 16:14:43 -0300 Subject: [PATCH 25/25] Update sitemap.ts --- apps/api/src/scraper/WebScraper/sitemap.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index d93403c8..756cd765 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -23,7 +23,7 @@ export async function getLinksFromSitemap( const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); content = response.data; } else if (mode === 'fire-engine') { - const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"chrome-cdp" } }); + const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"playwright" } }); content = response.html; } } catch (error) {