diff --git a/.github/workflows/autoscale.yml b/.github/workflows/autoscale.yml new file mode 100644 index 00000000..6ab7ca74 --- /dev/null +++ b/.github/workflows/autoscale.yml @@ -0,0 +1,36 @@ +name: Simple Autoscaler +on: + schedule: + - cron: '*/0.5 * * * *' + +env: + BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }} + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + +jobs: + scale: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: superfly/flyctl-actions/setup-flyctl@master + - name: Send GET request to check queues + run: | + response=$(curl --silent --max-time 180 https://api.firecrawl.dev/admin/${{ secrets.BULL_AUTH_KEY }}/autoscaler) + http_code=$(echo "$response" | jq -r '.status_code') + + mode=$(echo "$response" | jq -r '.mode') + count=$(echo "$response" | jq -r '.count') + + echo "Mode: $mode" + echo "Count: $count" + + if [ "$mode" = "scale-descale" ]; then + flyctl scale count $count -c fly.staging.toml --process-group=worker --yes + echo "Scaled to $count machines." + else + echo "No scaling needed. Mode: $mode" + fi + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }} + working-directory: apps/api diff --git a/apps/api/src/controllers/admin/queue.ts b/apps/api/src/controllers/admin/queue.ts index 095e7ca7..729ea004 100644 --- a/apps/api/src/controllers/admin/queue.ts +++ b/apps/api/src/controllers/admin/queue.ts @@ -4,6 +4,7 @@ import { Job } from "bullmq"; import { Logger } from "../../lib/logger"; import { getScrapeQueue } from "../../services/queue-service"; import { checkAlerts } from "../../services/alerts"; +import { exec } from "node:child_process"; export async function cleanBefore24hCompleteJobsController( req: Request, @@ -54,34 +55,100 @@ export async function cleanBefore24hCompleteJobsController( } } - export async function checkQueuesController(req: Request, res: Response) { - try { - await checkAlerts(); - return res.status(200).send("Alerts initialized"); - } catch (error) { - Logger.debug(`Failed to initialize alerts: ${error}`); - return res.status(500).send("Failed to initialize alerts"); - } + try { + await checkAlerts(); + return res.status(200).send("Alerts initialized"); + } catch (error) { + Logger.debug(`Failed to initialize alerts: ${error}`); + return res.status(500).send("Failed to initialize alerts"); } +} - // Use this as a "health check" that way we dont destroy the server +// Use this as a "health check" that way we dont destroy the server export async function queuesController(req: Request, res: Response) { - try { - const scrapeQueue = getScrapeQueue(); + try { + const scrapeQueue = getScrapeQueue(); - const [webScraperActive] = await Promise.all([ - scrapeQueue.getActiveCount(), - ]); + const [webScraperActive] = await Promise.all([ + scrapeQueue.getActiveCount(), + ]); - const noActiveJobs = webScraperActive === 0; - // 200 if no active jobs, 503 if there are active jobs - return res.status(noActiveJobs ? 200 : 500).json({ - webScraperActive, - noActiveJobs, - }); - } catch (error) { - Logger.error(error); - return res.status(500).json({ error: error.message }); + const noActiveJobs = webScraperActive === 0; + // 200 if no active jobs, 503 if there are active jobs + return res.status(noActiveJobs ? 200 : 500).json({ + webScraperActive, + noActiveJobs, + }); + } catch (error) { + Logger.error(error); + return res.status(500).json({ error: error.message }); + } +} + +export async function autoscalerController(req: Request, res: Response) { + try { + const maxNumberOfMachines = 100; + const minNumberOfMachines = 20; + + const scrapeQueue = getScrapeQueue(); + + const [webScraperActive, webScraperWaiting, webScraperPriority] = await Promise.all([ + scrapeQueue.getActiveCount(), + scrapeQueue.getWaitingCount(), + scrapeQueue.getPrioritizedCount(), + ]); + + let waitingAndPriorityCount = webScraperWaiting + webScraperPriority; + + // get number of machines active + const request = await fetch('https://api.machines.dev/v1/apps/firecrawl-scraper-js/machines', + { + headers: { + 'Authorization': `Bearer ${process.env.FLY_API_TOKEN}` + } + } + ) + const machines = await request.json(); + const activeMachines = machines.filter(machine => machine.state === 'started' || machine.state === "starting").length; + + let targetMachineCount = activeMachines; + + const baseScaleUp = 10; + const baseScaleDown = 5; + + // Scale up logic + if (webScraperActive > 9000 || waitingAndPriorityCount > 2000) { + targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 3)); + } else if (webScraperActive > 5000 || waitingAndPriorityCount > 1000) { + targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 2)); + } else if (webScraperActive > 1000 || waitingAndPriorityCount > 500) { + targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + baseScaleUp); } - } \ No newline at end of file + + // Scale down logic + if (webScraperActive < 100 && waitingAndPriorityCount < 50) { + targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 3)); + } else if (webScraperActive < 500 && waitingAndPriorityCount < 200) { + targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 2)); + } else if (webScraperActive < 1000 && waitingAndPriorityCount < 500) { + targetMachineCount = Math.max(minNumberOfMachines, activeMachines - baseScaleDown); + } + + if (targetMachineCount !== activeMachines) { + Logger.info(`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting`); + return res.status(200).json({ + mode: "scale-descale", + count: targetMachineCount, + }); + } + + return res.status(200).json({ + mode: "normal", + count: activeMachines, + }); + } catch (error) { + Logger.error(error); + return res.status(500).send("Failed to initialize autoscaler"); + } +} diff --git a/apps/api/src/routes/admin.ts b/apps/api/src/routes/admin.ts index 77d1bf46..d32808ce 100644 --- a/apps/api/src/routes/admin.ts +++ b/apps/api/src/routes/admin.ts @@ -1,6 +1,7 @@ import express from "express"; import { redisHealthController } from "../controllers/admin/redis-health"; import { + autoscalerController, checkQueuesController, cleanBefore24hCompleteJobsController, queuesController, @@ -27,3 +28,8 @@ adminRouter.get( `/admin/${process.env.BULL_AUTH_KEY}/queues`, queuesController ); + +adminRouter.get( + `/admin/${process.env.BULL_AUTH_KEY}/autoscaler`, + autoscalerController +); diff --git a/apps/api/src/scraper/WebScraper/single_url.ts b/apps/api/src/scraper/WebScraper/single_url.ts index 1f2a62de..6998a665 100644 --- a/apps/api/src/scraper/WebScraper/single_url.ts +++ b/apps/api/src/scraper/WebScraper/single_url.ts @@ -24,8 +24,8 @@ import { clientSideError } from "../../strings"; dotenv.config(); export const baseScrapers = [ - "fire-engine", "fire-engine;chrome-cdp", + "fire-engine", "scrapingBee", process.env.USE_DB_AUTHENTICATION ? undefined : "playwright", "scrapingBeeLoad", @@ -85,8 +85,8 @@ function getScrapingFallbackOrder( }); let defaultOrder = [ - !process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine", !process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine;chrome-cdp", + !process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine", "scrapingBee", process.env.USE_DB_AUTHENTICATION ? undefined : "playwright", "scrapingBeeLoad",