firecrawl/apps/api/src/index.ts

283 lines
8.5 KiB
TypeScript
Raw Normal View History

2024-04-15 17:01:47 -04:00
import express from "express";
import bodyParser from "body-parser";
import cors from "cors";
import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service";
2024-04-20 19:04:27 -07:00
import { redisClient } from "./services/rate-limiter";
2024-04-20 16:38:05 -07:00
import { v0Router } from "./routes/v0";
2024-07-11 23:20:51 +02:00
import { initSDK } from "@hyperdx/node-opentelemetry";
2024-06-12 17:53:04 -07:00
import cluster from "cluster";
import os from "os";
2024-06-25 17:49:29 -03:00
import { Job } from "bull";
2024-05-20 13:36:34 -07:00
2024-04-15 17:01:47 -04:00
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
const { ExpressAdapter } = require("@bull-board/express");
2024-06-12 18:07:05 -07:00
const numCPUs = process.env.ENV === "local" ? 2 : os.cpus().length;
2024-06-12 17:53:04 -07:00
console.log(`Number of CPUs: ${numCPUs} available`);
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
cluster.on("exit", (worker, code, signal) => {
2024-07-09 14:29:32 +02:00
if (code !== null) {
console.log(`Worker ${worker.process.pid} exited`);
console.log("Starting a new worker");
cluster.fork();
}
2024-06-12 17:53:04 -07:00
});
} else {
const app = express();
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
global.isProduction = process.env.IS_PRODUCTION === "true";
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json({ limit: "10mb" }));
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
app.use(cors()); // Add this line to enable CORS
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getWebScraperQueue())],
serverAdapter: serverAdapter,
});
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
app.use(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
serverAdapter.getRouter()
);
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
app.get("/", (req, res) => {
res.send("SCRAPERS-JS: Hello, world! Fly.io");
});
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
//write a simple test function
app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
2024-05-20 13:36:34 -07:00
2024-06-12 17:53:04 -07:00
// register router
app.use(v0Router);
2024-04-21 11:27:31 -07:00
2024-06-12 17:53:04 -07:00
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
redisClient.connect();
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
// HyperDX OpenTelemetry
2024-07-11 23:20:51 +02:00
if (process.env.ENV === "production") {
initSDK({ consoleCapture: true, additionalInstrumentations: [] });
}
2024-04-15 17:01:47 -04:00
2024-06-12 17:53:04 -07:00
function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => {
console.log(`Worker ${process.pid} listening on port ${port}`);
console.log(
`For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
);
console.log("");
console.log("1. Make sure Redis is running on port 6379 by default");
console.log(
"2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 "
);
2024-04-15 17:01:47 -04:00
});
2024-06-12 17:53:04 -07:00
return server;
2024-04-15 17:01:47 -04:00
}
2024-06-12 17:53:04 -07:00
if (require.main === module) {
startServer();
}
2024-06-12 17:53:04 -07:00
// Use this as a "health check" that way we dont destroy the server
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue();
2024-07-11 23:56:36 +02:00
2024-06-12 17:53:04 -07:00
const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
2024-07-11 23:14:15 +02:00
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
try {
const wsq = getWebScraperQueue();
console.log("Gracefully shutting down...");
await wsq.pause(false, true);
const jobs = await wsq.getActive();
if (jobs.length > 0) {
console.log("Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
}));
console.log("Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log("Done!");
res.json({ ok: true });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
2024-07-11 23:56:36 +02:00
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
2024-07-12 00:05:35 +02:00
await getWebScraperQueue().resume(false);
2024-07-11 23:56:36 +02:00
res.json({ ok: true });
});
2024-06-12 17:53:04 -07:00
app.get(`/serverHealthCheck`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue();
2024-06-12 17:53:04 -07:00
const [waitingJobs] = await Promise.all([
2024-04-23 16:07:22 -03:00
webScraperQueue.getWaitingCount(),
]);
2024-06-12 17:53:04 -07:00
const noWaitingJobs = waitingJobs === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noWaitingJobs ? 200 : 500).json({
waitingJobs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
2024-06-12 17:53:04 -07:00
app.get("/serverHealthCheck/notify", async (req, res) => {
if (process.env.SLACK_WEBHOOK_URL) {
const treshold = 1; // The treshold value for the active jobs
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const getWaitingJobsCount = async () => {
const webScraperQueue = getWebScraperQueue();
const [waitingJobsCount] = await Promise.all([
webScraperQueue.getWaitingCount(),
]);
return waitingJobsCount;
};
res.status(200).json({ message: "Check initiated" });
const checkWaitingJobs = async () => {
try {
let waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
setTimeout(async () => {
// Re-check the waiting jobs count after the timeout
waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
const message = {
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${
timeout / 60000
} minute(s).`,
};
const response = await fetch(slackWebhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(message),
});
if (!response.ok) {
console.error("Failed to send Slack notification");
}
}
2024-06-12 17:53:04 -07:00
}, timeout);
}
} catch (error) {
console.error(error);
}
2024-06-12 17:53:04 -07:00
};
2024-06-12 17:53:04 -07:00
checkWaitingJobs();
}
});
app.get(
`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`,
async (req, res) => {
try {
2024-06-12 17:53:04 -07:00
const webScraperQueue = getWebScraperQueue();
2024-06-25 17:49:29 -03:00
const batchSize = 10;
const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push(webScraperQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true
));
}
const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat();
2024-06-12 17:53:04 -07:00
const before24hJobs = completedJobs.filter(
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
2024-06-25 17:49:29 -03:00
) || [];
2024-06-12 17:53:04 -07:00
let count = 0;
2024-06-25 17:49:29 -03:00
if (!before24hJobs) {
return res.status(200).send(`No jobs to remove.`);
}
for (const job of before24hJobs) {
2024-06-12 17:53:04 -07:00
try {
2024-06-25 17:49:29 -03:00
await job.remove()
2024-06-12 17:53:04 -07:00
count++;
} catch (jobError) {
2024-06-25 17:49:29 -03:00
console.error(`Failed to remove job with ID ${job.id}:`, jobError);
2024-06-12 17:53:04 -07:00
}
}
2024-06-25 17:49:29 -03:00
return res.status(200).send(`Removed ${count} completed jobs.`);
2024-06-12 17:53:04 -07:00
} catch (error) {
console.error("Failed to clean last 24h complete jobs:", error);
2024-06-25 17:49:29 -03:00
return res.status(500).send("Failed to clean jobs");
}
}
2024-06-12 17:53:04 -07:00
);
2024-05-30 14:31:36 -07:00
2024-06-12 17:53:04 -07:00
app.get("/is-production", (req, res) => {
res.send({ isProduction: global.isProduction });
});
2024-05-30 14:31:36 -07:00
2024-06-12 17:53:04 -07:00
console.log(`Worker ${process.pid} started`);
}