firecrawl/apps/api/src/services/queue-worker.ts
2024-08-09 14:07:46 -04:00

241 lines
7.1 KiB
TypeScript

import { CustomError } from "../lib/custom-error";
import {
getWebScraperQueue,
getScrapeQueue,
redisConnection,
webScraperQueueName,
scrapeQueueName,
} from "./queue-service";
import "dotenv/config";
import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job, QueueEvents, tryCatch } from "bullmq";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
if (process.env.ENV === "production") {
initSDK({
consoleCapture: true,
additionalInstrumentations: [],
});
}
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const wsq = getWebScraperQueue();
const sq = getScrapeQueue();
const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
Logger.info(`🐂 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await processJob(job, token);
try{
await job.moveToCompleted(result.docs, token, false);
}catch(e){
}
} catch (error) {
console.log("Job failed, error:", error);
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down gracefully...");
isShuttingDown = true;
});
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
maxStalledCount: 10, // 10 times
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
console.log("Cant accept connection");
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
}
const job = await worker.getNextJob(token);
if (job) {
processJobInternal(token, job);
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
workerFun(webScraperQueueName, processJobInternal);
workerFun(scrapeQueueName, processJobInternal);
async function processJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking job ${job.id}`);
try {
job.updateProgress({
current: 1,
total: 100,
current_step: "SCRAPING",
current_url: "",
});
const start = Date.now();
const { success, message, docs } = await startWebScraperPipeline({
job,
token,
});
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + job.id);
if (isCancelled) {
await job.discard();
await job.moveToFailed(Error("Job cancelled by user"), job.token);
await job.discard();
}
const data = {
success,
result: {
links: isCancelled ? [] : docs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: isCancelled ? "Job cancelled by user" : message /* etc... */,
docs: isCancelled ? [] : docs,
};
if (job.data.mode === "crawl" && !isCancelled) {
await callWebhook(job.data.team_id, job.id as string, data);
}
await logJob({
job_id: job.id as string,
success: success && !isCancelled,
message: isCancelled ? "Job cancelled by user" : message,
num_docs: isCancelled ? 0 : docs.length,
docs: isCancelled ? [] : docs,
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
});
Logger.info(`🐂 Job done ${job.id}`);
return data;
} catch (error) {
Logger.error(`🐂 Job errored ${job.id} - ${error}`);
if (await getWebScraperQueue().isPaused()) {
Logger.debug("🐂Queue is paused, ignoring");
return;
}
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
Logger.error(error.message); // or any other error handling
logtail.error("Custom error while ingesting", {
job_id: job.id,
error: error.message,
dataIngestionJob: error.dataIngestionJob,
});
}
Logger.error(error);
logtail.error("Overall error ingesting", {
job_id: job.id,
error: error.message,
});
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data);
}
await logJob({
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
});
// done(null, data);
return data;
}
}
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));