mirror of
https://github.com/mendableai/firecrawl.git
synced 2026-01-11 06:43:38 +00:00
* Implement GCS storage option for scrape results across controllers and update GCS document retrieval functionality * done! * Update gcs-jobs.ts
178 lines
4.5 KiB
TypeScript
178 lines
4.5 KiB
TypeScript
import { Response } from "express";
|
|
import { v4 as uuidv4 } from "uuid";
|
|
import {
|
|
BatchScrapeRequest,
|
|
batchScrapeRequestSchema,
|
|
batchScrapeRequestSchemaNoURLValidation,
|
|
url as urlSchema,
|
|
RequestWithAuth,
|
|
ScrapeOptions,
|
|
BatchScrapeResponse,
|
|
} from "./types";
|
|
import {
|
|
addCrawlJobs,
|
|
finishCrawlKickoff,
|
|
getCrawl,
|
|
lockURLs,
|
|
saveCrawl,
|
|
StoredCrawl,
|
|
} from "../../lib/crawl-redis";
|
|
import { logCrawl } from "../../services/logging/crawl_log";
|
|
import { getJobPriority } from "../../lib/job-priority";
|
|
import { addScrapeJobs } from "../../services/queue-jobs";
|
|
import { callWebhook } from "../../services/webhook";
|
|
import { logger as _logger } from "../../lib/logger";
|
|
import { CostTracking } from "../../lib/extract/extraction-service";
|
|
|
|
export async function batchScrapeController(
|
|
req: RequestWithAuth<{}, BatchScrapeResponse, BatchScrapeRequest>,
|
|
res: Response<BatchScrapeResponse>,
|
|
) {
|
|
if (req.body?.ignoreInvalidURLs === true) {
|
|
req.body = batchScrapeRequestSchemaNoURLValidation.parse(req.body);
|
|
} else {
|
|
req.body = batchScrapeRequestSchema.parse(req.body);
|
|
}
|
|
|
|
const id = req.body.appendToId ?? uuidv4();
|
|
const logger = _logger.child({
|
|
crawlId: id,
|
|
batchScrapeId: id,
|
|
module: "api/v1",
|
|
method: "batchScrapeController",
|
|
teamId: req.auth.team_id,
|
|
});
|
|
|
|
let urls = req.body.urls;
|
|
let invalidURLs: string[] | undefined = undefined;
|
|
|
|
if (req.body.ignoreInvalidURLs) {
|
|
invalidURLs = [];
|
|
|
|
let pendingURLs = urls;
|
|
urls = [];
|
|
for (const u of pendingURLs) {
|
|
try {
|
|
const nu = urlSchema.parse(u);
|
|
urls.push(nu);
|
|
} catch (_) {
|
|
invalidURLs.push(u);
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.debug("Batch scrape " + id + " starting", {
|
|
urlsLength: urls.length,
|
|
appendToId: req.body.appendToId,
|
|
account: req.account,
|
|
});
|
|
|
|
if (!req.body.appendToId) {
|
|
await logCrawl(id, req.auth.team_id);
|
|
}
|
|
|
|
let { remainingCredits } = req.account!;
|
|
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
|
|
if (!useDbAuthentication) {
|
|
remainingCredits = Infinity;
|
|
}
|
|
|
|
const sc: StoredCrawl = req.body.appendToId
|
|
? ((await getCrawl(req.body.appendToId)) as StoredCrawl)
|
|
: {
|
|
crawlerOptions: null,
|
|
scrapeOptions: req.body,
|
|
internalOptions: {
|
|
disableSmartWaitCache: true,
|
|
teamId: req.auth.team_id,
|
|
saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false,
|
|
}, // NOTE: smart wait disabled for batch scrapes to ensure contentful scrape, speed does not matter
|
|
team_id: req.auth.team_id,
|
|
createdAt: Date.now(),
|
|
};
|
|
|
|
if (!req.body.appendToId) {
|
|
await saveCrawl(id, sc);
|
|
}
|
|
|
|
let jobPriority = 20;
|
|
|
|
// If it is over 1000, we need to get the job priority,
|
|
// otherwise we can use the default priority of 20
|
|
if (urls.length > 1000) {
|
|
// set base to 21
|
|
jobPriority = await getJobPriority({
|
|
team_id: req.auth.team_id,
|
|
basePriority: 21,
|
|
});
|
|
}
|
|
logger.debug("Using job priority " + jobPriority, { jobPriority });
|
|
|
|
const scrapeOptions: ScrapeOptions = { ...req.body };
|
|
delete (scrapeOptions as any).urls;
|
|
delete (scrapeOptions as any).appendToId;
|
|
|
|
const jobs = urls.map((x) => {
|
|
return {
|
|
data: {
|
|
url: x,
|
|
mode: "single_urls" as const,
|
|
team_id: req.auth.team_id,
|
|
crawlerOptions: null,
|
|
scrapeOptions,
|
|
origin: "api",
|
|
crawl_id: id,
|
|
sitemapped: true,
|
|
v1: true,
|
|
webhook: req.body.webhook,
|
|
internalOptions: {
|
|
saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false,
|
|
},
|
|
},
|
|
opts: {
|
|
jobId: uuidv4(),
|
|
priority: 20,
|
|
},
|
|
};
|
|
});
|
|
|
|
await finishCrawlKickoff(id);
|
|
|
|
logger.debug("Locking URLs...");
|
|
await lockURLs(
|
|
id,
|
|
sc,
|
|
jobs.map((x) => x.data.url),
|
|
);
|
|
logger.debug("Adding scrape jobs to Redis...");
|
|
await addCrawlJobs(
|
|
id,
|
|
jobs.map((x) => x.opts.jobId),
|
|
);
|
|
logger.debug("Adding scrape jobs to BullMQ...");
|
|
await addScrapeJobs(jobs);
|
|
|
|
if (req.body.webhook) {
|
|
logger.debug("Calling webhook with batch_scrape.started...", {
|
|
webhook: req.body.webhook,
|
|
});
|
|
await callWebhook(
|
|
req.auth.team_id,
|
|
id,
|
|
null,
|
|
req.body.webhook,
|
|
true,
|
|
"batch_scrape.started",
|
|
);
|
|
}
|
|
|
|
const protocol = process.env.ENV === "local" ? req.protocol : "https";
|
|
|
|
return res.status(200).json({
|
|
success: true,
|
|
id,
|
|
url: `${protocol}://${req.get("host")}/v1/batch/scrape/${id}`,
|
|
invalidURLs,
|
|
});
|
|
}
|