mirror of
https://github.com/mendableai/firecrawl.git
synced 2025-06-27 00:41:33 +00:00
feat(crawl-status): refactor to work after a redis flush (#1664)
This commit is contained in:
parent
cd2e0f868c
commit
ebc1de9d60
@ -135,69 +135,173 @@ export async function crawlStatusController(
|
||||
res: Response<CrawlStatusResponse>,
|
||||
isBatch = false,
|
||||
) {
|
||||
const start =
|
||||
typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
|
||||
const end =
|
||||
typeof req.query.limit === "string"
|
||||
? start + parseInt(req.query.limit, 10) - 1
|
||||
: undefined;
|
||||
|
||||
const sc = await getCrawl(req.params.jobId);
|
||||
if (!sc) {
|
||||
|
||||
let status: Exclude<CrawlStatusResponse, ErrorResponse>["status"];
|
||||
let doneJobsLength: number;
|
||||
let doneJobsOrder: string[];
|
||||
let totalCount: number;
|
||||
let creditsUsed: number;
|
||||
|
||||
if (sc) {
|
||||
if (sc.team_id !== req.auth.team_id) {
|
||||
return res.status(403).json({ success: false, error: "Forbidden" });
|
||||
}
|
||||
|
||||
let jobIDs = await getCrawlJobs(req.params.jobId);
|
||||
let jobStatuses = await Promise.all(
|
||||
jobIDs.map(
|
||||
async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
|
||||
),
|
||||
);
|
||||
|
||||
const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
|
||||
const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set();
|
||||
const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]);
|
||||
|
||||
const validJobStatuses: [string, JobState | "unknown"][] = [];
|
||||
const validJobIDs: string[] = [];
|
||||
|
||||
for (const [id, status] of jobStatuses) {
|
||||
if (throttledJobsSet.has(id)) {
|
||||
validJobStatuses.push([id, "prioritized"]);
|
||||
validJobIDs.push(id);
|
||||
} else if (
|
||||
status !== "failed" &&
|
||||
status !== "unknown"
|
||||
) {
|
||||
validJobStatuses.push([id, status]);
|
||||
validJobIDs.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
status =
|
||||
sc.cancelled
|
||||
? "cancelled"
|
||||
: validJobStatuses.every((x) => x[1] === "completed") &&
|
||||
(sc.crawlerOptions
|
||||
? await isCrawlKickoffFinished(req.params.jobId)
|
||||
: true)
|
||||
? "completed"
|
||||
: "scraping";
|
||||
|
||||
// Use validJobIDs instead of jobIDs for further processing
|
||||
jobIDs = validJobIDs;
|
||||
|
||||
doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
|
||||
doneJobsOrder = await getDoneJobsOrdered(
|
||||
req.params.jobId,
|
||||
start,
|
||||
end ?? -1,
|
||||
);
|
||||
|
||||
totalCount = jobIDs.length;
|
||||
|
||||
if (totalCount === 0 && process.env.USE_DB_AUTHENTICATION === "true") {
|
||||
const x = await supabase_rr_service
|
||||
.from('firecrawl_jobs')
|
||||
.select('*', { count: 'exact', head: true })
|
||||
.eq("crawl_id", req.params.jobId)
|
||||
.eq("success", true)
|
||||
|
||||
totalCount = x.count ?? 0;
|
||||
}
|
||||
|
||||
creditsUsed = totalCount * (
|
||||
sc.scrapeOptions?.extract
|
||||
? 5
|
||||
: 1
|
||||
)
|
||||
} else if (process.env.USE_DB_AUTHENTICATION === "true") {
|
||||
const scrapeJobCount = await supabase_rr_service
|
||||
.from("firecrawl_jobs")
|
||||
.select("*", { count: "exact", head: true })
|
||||
.eq("crawl_id", req.params.jobId)
|
||||
.eq("team_id", req.auth.team_id)
|
||||
.eq("success", true)
|
||||
.throwOnError();
|
||||
|
||||
const crawlJobQuery = await supabase_rr_service
|
||||
.from("firecrawl_jobs")
|
||||
.select("*")
|
||||
.eq("job_id", req.params.jobId)
|
||||
.limit(1)
|
||||
.throwOnError();
|
||||
|
||||
if (!crawlJobQuery.data || crawlJobQuery.data.length === 0) {
|
||||
if (scrapeJobCount.count === 0) {
|
||||
return res.status(404).json({ success: false, error: "Job not found" });
|
||||
} else {
|
||||
status = "completed"; // fake completed to cut the losses
|
||||
}
|
||||
} else {
|
||||
status = crawlJobQuery.data[0].success ? "completed" : "failed";
|
||||
}
|
||||
|
||||
const crawlJob = crawlJobQuery.data?.[0];
|
||||
|
||||
if (crawlJob && crawlJob.team_id !== req.auth.team_id) {
|
||||
return res.status(403).json({ success: false, error: "Forbidden" });
|
||||
}
|
||||
|
||||
if (
|
||||
crawlJob
|
||||
&& new Date().valueOf() - new Date(crawlJob.date_added).valueOf() > 24 * 60 * 60 * 1000
|
||||
) {
|
||||
return res.status(404).json({ success: false, error: "Job expired" });
|
||||
}
|
||||
|
||||
doneJobsLength = scrapeJobCount.count!;
|
||||
doneJobsOrder = [];
|
||||
|
||||
const step = 1000;
|
||||
let i = 0;
|
||||
while (true) {
|
||||
const rangeStart = start + (i * step);
|
||||
let rangeEnd = start + ((i + 1) * step);
|
||||
if (end !== undefined) {
|
||||
rangeEnd = Math.min(end, rangeEnd);
|
||||
}
|
||||
|
||||
const currentJobs = await supabase_rr_service
|
||||
.from("firecrawl_jobs")
|
||||
.select("job_id")
|
||||
.eq("crawl_id", req.params.jobId)
|
||||
.eq("team_id", req.acuc.team_id)
|
||||
.order("date_added", { ascending: true })
|
||||
.range(rangeStart, rangeEnd)
|
||||
.throwOnError();
|
||||
|
||||
const rangeLength = rangeEnd - rangeStart;
|
||||
|
||||
const data = currentJobs.data ?? [];
|
||||
|
||||
doneJobsOrder.push(...data.map(x => x.job_id));
|
||||
|
||||
if (data.length < rangeLength) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (rangeEnd === end) {
|
||||
break;
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
totalCount = scrapeJobCount.count ?? 0;
|
||||
creditsUsed = crawlJob?.credits_billed ?? totalCount;
|
||||
} else {
|
||||
return res.status(404).json({ success: false, error: "Job not found" });
|
||||
}
|
||||
|
||||
if (sc.team_id !== req.auth.team_id) {
|
||||
return res.status(403).json({ success: false, error: "Forbidden" });
|
||||
}
|
||||
|
||||
const start =
|
||||
typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
|
||||
const end =
|
||||
typeof req.query.limit === "string"
|
||||
? start + parseInt(req.query.limit, 10) - 1
|
||||
: undefined;
|
||||
|
||||
let jobIDs = await getCrawlJobs(req.params.jobId);
|
||||
let jobStatuses = await Promise.all(
|
||||
jobIDs.map(
|
||||
async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
|
||||
),
|
||||
);
|
||||
|
||||
const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
|
||||
const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set();
|
||||
const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]);
|
||||
|
||||
const validJobStatuses: [string, JobState | "unknown"][] = [];
|
||||
const validJobIDs: string[] = [];
|
||||
|
||||
for (const [id, status] of jobStatuses) {
|
||||
if (throttledJobsSet.has(id)) {
|
||||
validJobStatuses.push([id, "prioritized"]);
|
||||
validJobIDs.push(id);
|
||||
} else if (
|
||||
status !== "failed" &&
|
||||
status !== "unknown"
|
||||
) {
|
||||
validJobStatuses.push([id, status]);
|
||||
validJobIDs.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] =
|
||||
sc.cancelled
|
||||
? "cancelled"
|
||||
: validJobStatuses.every((x) => x[1] === "completed") &&
|
||||
(sc.crawlerOptions
|
||||
? await isCrawlKickoffFinished(req.params.jobId)
|
||||
: true)
|
||||
? "completed"
|
||||
: "scraping";
|
||||
|
||||
// Use validJobIDs instead of jobIDs for further processing
|
||||
jobIDs = validJobIDs;
|
||||
|
||||
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
|
||||
const doneJobsOrder = await getDoneJobsOrdered(
|
||||
req.params.jobId,
|
||||
start,
|
||||
end ?? -1,
|
||||
);
|
||||
|
||||
let doneJobs: PseudoJob<any>[] = [];
|
||||
|
||||
if (end === undefined) {
|
||||
@ -265,28 +369,12 @@ export async function crawlStatusController(
|
||||
nextURL.searchParams.set("limit", req.query.limit);
|
||||
}
|
||||
|
||||
let totalCount = jobIDs.length;
|
||||
|
||||
if (totalCount === 0 && process.env.USE_DB_AUTHENTICATION === "true") {
|
||||
const x = await supabase_rr_service
|
||||
.from('firecrawl_jobs')
|
||||
.select('*', { count: 'exact', head: true })
|
||||
.eq("crawl_id", req.params.jobId)
|
||||
.eq("success", true)
|
||||
|
||||
totalCount = x.count ?? 0;
|
||||
}
|
||||
|
||||
res.status(200).json({
|
||||
success: true,
|
||||
status,
|
||||
completed: doneJobsLength,
|
||||
total: totalCount,
|
||||
creditsUsed: totalCount * (
|
||||
sc.scrapeOptions?.extract
|
||||
? 5
|
||||
: 1
|
||||
),
|
||||
creditsUsed,
|
||||
expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(),
|
||||
next:
|
||||
status !== "scraping" && start + data.length === doneJobsLength // if there's not gonna be any documents after this
|
||||
|
Loading…
x
Reference in New Issue
Block a user