2024-04-15 17:01:47 -04:00
|
|
|
import express from "express";
|
|
|
|
import bodyParser from "body-parser";
|
|
|
|
import cors from "cors";
|
|
|
|
import "dotenv/config";
|
|
|
|
import { getWebScraperQueue } from "./services/queue-service";
|
2024-04-20 19:04:27 -07:00
|
|
|
import { redisClient } from "./services/rate-limiter";
|
2024-04-20 16:38:05 -07:00
|
|
|
import { v0Router } from "./routes/v0";
|
2024-06-12 17:53:04 -07:00
|
|
|
import { initSDK } from "@hyperdx/node-opentelemetry";
|
|
|
|
import cluster from "cluster";
|
|
|
|
import os from "os";
|
2024-06-25 17:49:29 -03:00
|
|
|
import { Job } from "bull";
|
2024-07-09 14:56:47 +02:00
|
|
|
import { supabase_service } from "./services/supabase";
|
|
|
|
import { logJob } from "./services/logging/log_job";
|
2024-05-20 13:36:34 -07:00
|
|
|
|
2024-04-15 17:01:47 -04:00
|
|
|
const { createBullBoard } = require("@bull-board/api");
|
|
|
|
const { BullAdapter } = require("@bull-board/api/bullAdapter");
|
|
|
|
const { ExpressAdapter } = require("@bull-board/express");
|
|
|
|
|
2024-06-12 18:07:05 -07:00
|
|
|
const numCPUs = process.env.ENV === "local" ? 2 : os.cpus().length;
|
2024-06-12 17:53:04 -07:00
|
|
|
console.log(`Number of CPUs: ${numCPUs} available`);
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
if (cluster.isMaster) {
|
|
|
|
console.log(`Master ${process.pid} is running`);
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-07-09 14:56:47 +02:00
|
|
|
(async () => {
|
|
|
|
if (process.env.USE_DB_AUTHENTICATION) {
|
|
|
|
const wsq = getWebScraperQueue();
|
|
|
|
const { error, data } = await supabase_service
|
|
|
|
.from("firecrawl_jobs")
|
|
|
|
.select()
|
|
|
|
.eq("retry", true);
|
|
|
|
|
|
|
|
if (error) throw new Error(error.message);
|
|
|
|
|
|
|
|
await wsq.addBulk(data.map(x => ({
|
|
|
|
data: {
|
|
|
|
url: x.url,
|
|
|
|
mode: x.mode,
|
|
|
|
crawlerOptions: x.crawler_options,
|
|
|
|
team_id: x.team_id,
|
|
|
|
pageOptions: x.page_options,
|
|
|
|
origin: x.origin,
|
|
|
|
},
|
|
|
|
opts: {
|
|
|
|
jobId: x.job_id,
|
|
|
|
}
|
|
|
|
})))
|
|
|
|
|
|
|
|
if (data.length > 0) {
|
|
|
|
await supabase_service
|
|
|
|
.from("firecrawl_jobs")
|
|
|
|
.delete()
|
|
|
|
.in("id", data.map(x => x.id));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})();
|
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
// Fork workers.
|
|
|
|
for (let i = 0; i < numCPUs; i++) {
|
|
|
|
cluster.fork();
|
|
|
|
}
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
cluster.on("exit", (worker, code, signal) => {
|
2024-07-09 14:29:32 +02:00
|
|
|
if (code !== null) {
|
|
|
|
console.log(`Worker ${worker.process.pid} exited`);
|
|
|
|
console.log("Starting a new worker");
|
|
|
|
cluster.fork();
|
|
|
|
}
|
2024-06-12 17:53:04 -07:00
|
|
|
});
|
2024-07-09 14:29:32 +02:00
|
|
|
|
2024-07-09 14:56:47 +02:00
|
|
|
const onExit = async () => {
|
2024-07-09 14:29:32 +02:00
|
|
|
console.log("Shutting down gracefully...");
|
|
|
|
|
|
|
|
if (cluster.workers) {
|
|
|
|
for (const worker of Object.keys(cluster.workers || {})) {
|
|
|
|
cluster.workers[worker].process.kill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-09 14:56:47 +02:00
|
|
|
if (process.env.USE_DB_AUTHENTICATION) {
|
|
|
|
const wsq = getWebScraperQueue();
|
|
|
|
const activeJobCount = await wsq.getActiveCount();
|
|
|
|
console.log("Updating", activeJobCount, "in-progress jobs");
|
|
|
|
|
|
|
|
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
|
|
|
|
return wsq.getActive(i, i+10)
|
|
|
|
}))).flat(1);
|
|
|
|
|
|
|
|
for (const job of activeJobs) {
|
|
|
|
console.log(job.id);
|
|
|
|
try {
|
|
|
|
await logJob({
|
|
|
|
job_id: job.id as string,
|
|
|
|
success: false,
|
|
|
|
message: "Interrupted, retrying",
|
|
|
|
num_docs: 0,
|
|
|
|
docs: [],
|
|
|
|
time_taken: 0,
|
|
|
|
team_id: job.data.team_id,
|
|
|
|
mode: "crawl",
|
|
|
|
url: job.data.url,
|
|
|
|
crawlerOptions: job.data.crawlerOptions,
|
|
|
|
pageOptions: job.data.pageOptions,
|
|
|
|
origin: job.data.origin,
|
|
|
|
retry: true,
|
|
|
|
});
|
|
|
|
|
|
|
|
await wsq.client.del(await job.lockKey());
|
|
|
|
await job.takeLock();
|
|
|
|
await job.moveToFailed({ message: "interrupted" });
|
|
|
|
await job.remove();
|
|
|
|
} catch (error) {
|
|
|
|
console.error("Failed to update job status:", error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-09 14:29:32 +02:00
|
|
|
process.exit();
|
|
|
|
};
|
|
|
|
|
|
|
|
process.on("SIGINT", onExit);
|
|
|
|
process.on("SIGTERM", onExit);
|
2024-06-12 17:53:04 -07:00
|
|
|
} else {
|
|
|
|
const app = express();
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
global.isProduction = process.env.IS_PRODUCTION === "true";
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.use(bodyParser.urlencoded({ extended: true }));
|
|
|
|
app.use(bodyParser.json({ limit: "10mb" }));
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.use(cors()); // Add this line to enable CORS
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
const serverAdapter = new ExpressAdapter();
|
|
|
|
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
|
|
|
|
queues: [new BullAdapter(getWebScraperQueue())],
|
|
|
|
serverAdapter: serverAdapter,
|
|
|
|
});
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.use(
|
|
|
|
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
|
|
|
|
serverAdapter.getRouter()
|
|
|
|
);
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.get("/", (req, res) => {
|
|
|
|
res.send("SCRAPERS-JS: Hello, world! Fly.io");
|
|
|
|
});
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
//write a simple test function
|
|
|
|
app.get("/test", async (req, res) => {
|
|
|
|
res.send("Hello, world!");
|
|
|
|
});
|
2024-05-20 13:36:34 -07:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
// register router
|
|
|
|
app.use(v0Router);
|
2024-04-21 11:27:31 -07:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
const DEFAULT_PORT = process.env.PORT ?? 3002;
|
|
|
|
const HOST = process.env.HOST ?? "localhost";
|
|
|
|
redisClient.connect();
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
// HyperDX OpenTelemetry
|
|
|
|
if (process.env.ENV === "production") {
|
|
|
|
initSDK({ consoleCapture: true, additionalInstrumentations: [] });
|
|
|
|
}
|
2024-04-15 17:01:47 -04:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
function startServer(port = DEFAULT_PORT) {
|
|
|
|
const server = app.listen(Number(port), HOST, () => {
|
|
|
|
console.log(`Worker ${process.pid} listening on port ${port}`);
|
|
|
|
console.log(
|
|
|
|
`For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
|
|
|
|
);
|
|
|
|
console.log("");
|
|
|
|
console.log("1. Make sure Redis is running on port 6379 by default");
|
|
|
|
console.log(
|
|
|
|
"2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 "
|
|
|
|
);
|
2024-04-15 17:01:47 -04:00
|
|
|
});
|
2024-06-12 17:53:04 -07:00
|
|
|
return server;
|
2024-04-15 17:01:47 -04:00
|
|
|
}
|
2024-06-12 17:53:04 -07:00
|
|
|
|
|
|
|
if (require.main === module) {
|
|
|
|
startServer();
|
2024-04-23 15:46:29 -03:00
|
|
|
}
|
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
// Use this as a "health check" that way we dont destroy the server
|
|
|
|
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
|
|
|
|
try {
|
|
|
|
const webScraperQueue = getWebScraperQueue();
|
|
|
|
const [webScraperActive] = await Promise.all([
|
|
|
|
webScraperQueue.getActiveCount(),
|
|
|
|
]);
|
|
|
|
|
|
|
|
const noActiveJobs = webScraperActive === 0;
|
|
|
|
// 200 if no active jobs, 503 if there are active jobs
|
|
|
|
return res.status(noActiveJobs ? 200 : 500).json({
|
|
|
|
webScraperActive,
|
|
|
|
noActiveJobs,
|
|
|
|
});
|
|
|
|
} catch (error) {
|
|
|
|
console.error(error);
|
|
|
|
return res.status(500).json({ error: error.message });
|
|
|
|
}
|
|
|
|
});
|
2024-04-23 15:46:29 -03:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.get(`/serverHealthCheck`, async (req, res) => {
|
|
|
|
try {
|
2024-04-23 15:46:29 -03:00
|
|
|
const webScraperQueue = getWebScraperQueue();
|
2024-06-12 17:53:04 -07:00
|
|
|
const [waitingJobs] = await Promise.all([
|
2024-04-23 16:07:22 -03:00
|
|
|
webScraperQueue.getWaitingCount(),
|
2024-04-23 15:46:29 -03:00
|
|
|
]);
|
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
const noWaitingJobs = waitingJobs === 0;
|
|
|
|
// 200 if no active jobs, 503 if there are active jobs
|
|
|
|
return res.status(noWaitingJobs ? 200 : 500).json({
|
|
|
|
waitingJobs,
|
|
|
|
});
|
|
|
|
} catch (error) {
|
|
|
|
console.error(error);
|
|
|
|
return res.status(500).json({ error: error.message });
|
|
|
|
}
|
|
|
|
});
|
2024-04-23 15:46:29 -03:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.get("/serverHealthCheck/notify", async (req, res) => {
|
|
|
|
if (process.env.SLACK_WEBHOOK_URL) {
|
|
|
|
const treshold = 1; // The treshold value for the active jobs
|
|
|
|
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
|
|
|
|
|
|
|
|
const getWaitingJobsCount = async () => {
|
|
|
|
const webScraperQueue = getWebScraperQueue();
|
|
|
|
const [waitingJobsCount] = await Promise.all([
|
|
|
|
webScraperQueue.getWaitingCount(),
|
|
|
|
]);
|
|
|
|
|
|
|
|
return waitingJobsCount;
|
|
|
|
};
|
|
|
|
|
|
|
|
res.status(200).json({ message: "Check initiated" });
|
|
|
|
|
|
|
|
const checkWaitingJobs = async () => {
|
|
|
|
try {
|
|
|
|
let waitingJobsCount = await getWaitingJobsCount();
|
|
|
|
if (waitingJobsCount >= treshold) {
|
|
|
|
setTimeout(async () => {
|
|
|
|
// Re-check the waiting jobs count after the timeout
|
|
|
|
waitingJobsCount = await getWaitingJobsCount();
|
|
|
|
if (waitingJobsCount >= treshold) {
|
|
|
|
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
|
|
|
|
const message = {
|
|
|
|
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${
|
|
|
|
timeout / 60000
|
|
|
|
} minute(s).`,
|
|
|
|
};
|
|
|
|
|
|
|
|
const response = await fetch(slackWebhookUrl, {
|
|
|
|
method: "POST",
|
|
|
|
headers: {
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
},
|
|
|
|
body: JSON.stringify(message),
|
|
|
|
});
|
|
|
|
|
|
|
|
if (!response.ok) {
|
|
|
|
console.error("Failed to send Slack notification");
|
|
|
|
}
|
2024-04-23 15:46:29 -03:00
|
|
|
}
|
2024-06-12 17:53:04 -07:00
|
|
|
}, timeout);
|
|
|
|
}
|
|
|
|
} catch (error) {
|
|
|
|
console.error(error);
|
2024-04-23 15:46:29 -03:00
|
|
|
}
|
2024-06-12 17:53:04 -07:00
|
|
|
};
|
2024-04-23 15:46:29 -03:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
checkWaitingJobs();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
app.get(
|
|
|
|
`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`,
|
|
|
|
async (req, res) => {
|
2024-06-11 14:18:05 -03:00
|
|
|
try {
|
2024-06-12 17:53:04 -07:00
|
|
|
const webScraperQueue = getWebScraperQueue();
|
2024-06-25 17:49:29 -03:00
|
|
|
const batchSize = 10;
|
|
|
|
const numberOfBatches = 9; // Adjust based on your needs
|
|
|
|
const completedJobsPromises: Promise<Job[]>[] = [];
|
|
|
|
for (let i = 0; i < numberOfBatches; i++) {
|
|
|
|
completedJobsPromises.push(webScraperQueue.getJobs(
|
|
|
|
["completed"],
|
|
|
|
i * batchSize,
|
|
|
|
i * batchSize + batchSize,
|
|
|
|
true
|
|
|
|
));
|
|
|
|
}
|
|
|
|
const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat();
|
2024-06-12 17:53:04 -07:00
|
|
|
const before24hJobs = completedJobs.filter(
|
|
|
|
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
|
2024-06-25 17:49:29 -03:00
|
|
|
) || [];
|
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
let count = 0;
|
2024-06-25 17:49:29 -03:00
|
|
|
|
|
|
|
if (!before24hJobs) {
|
|
|
|
return res.status(200).send(`No jobs to remove.`);
|
|
|
|
}
|
|
|
|
|
|
|
|
for (const job of before24hJobs) {
|
2024-06-12 17:53:04 -07:00
|
|
|
try {
|
2024-06-25 17:49:29 -03:00
|
|
|
await job.remove()
|
2024-06-12 17:53:04 -07:00
|
|
|
count++;
|
|
|
|
} catch (jobError) {
|
2024-06-25 17:49:29 -03:00
|
|
|
console.error(`Failed to remove job with ID ${job.id}:`, jobError);
|
2024-06-12 17:53:04 -07:00
|
|
|
}
|
|
|
|
}
|
2024-06-25 17:49:29 -03:00
|
|
|
return res.status(200).send(`Removed ${count} completed jobs.`);
|
2024-06-12 17:53:04 -07:00
|
|
|
} catch (error) {
|
|
|
|
console.error("Failed to clean last 24h complete jobs:", error);
|
2024-06-25 17:49:29 -03:00
|
|
|
return res.status(500).send("Failed to clean jobs");
|
2024-06-11 14:18:05 -03:00
|
|
|
}
|
|
|
|
}
|
2024-06-12 17:53:04 -07:00
|
|
|
);
|
2024-05-30 14:31:36 -07:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
app.get("/is-production", (req, res) => {
|
|
|
|
res.send({ isProduction: global.isProduction });
|
|
|
|
});
|
2024-05-30 14:31:36 -07:00
|
|
|
|
2024-06-12 17:53:04 -07:00
|
|
|
console.log(`Worker ${process.pid} started`);
|
|
|
|
}
|