feat(scraper): runpod v2 parallel testing (#1636)

* feat(scraper): runpod v2 parallel testing

* fix catch
This commit is contained in:
Thomas Kosmas 2025-06-05 20:28:01 +03:00 committed by GitHub
parent b2e0f657bd
commit 4bf64d2c01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,14 +7,22 @@ import * as Sentry from "@sentry/node";
import escapeHtml from "escape-html";
import PdfParse from "pdf-parse";
import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile";
import { PDFAntibotError, PDFInsufficientTimeError, RemoveFeatureError, TimeoutError } from "../../error";
import {
PDFAntibotError,
PDFInsufficientTimeError,
RemoveFeatureError,
TimeoutError,
} from "../../error";
import { readFile, unlink } from "node:fs/promises";
import path from "node:path";
import type { Response } from "undici";
import { getPdfResultFromCache, savePdfResultToCache } from "../../../../lib/gcs-pdf-cache";
import {
getPdfResultFromCache,
savePdfResultToCache,
} from "../../../../lib/gcs-pdf-cache";
import { getPageCount } from "../../../../lib/pdf-parser";
type PDFProcessorResult = { html: string; markdown?: string; };
type PDFProcessorResult = { html: string; markdown?: string };
const MAX_FILE_SIZE = 19 * 1024 * 1024; // 19MB
const MILLISECONDS_PER_PAGE = 150;
@ -33,7 +41,7 @@ async function scrapePDFWithRunPodMU(
try {
const cachedResult = await getPdfResultFromCache(base64Content);
if (cachedResult) {
meta.logger.info("Using cached RunPod MU result for PDF", {
tempFilePath,
@ -47,7 +55,9 @@ async function scrapePDFWithRunPodMU(
});
}
const timeout = timeToRun ? timeToRun - (Date.now() - preCacheCheckStartTime) : undefined;
const timeout = timeToRun
? timeToRun - (Date.now() - preCacheCheckStartTime)
: undefined;
if (timeout && timeout < 0) {
throw new TimeoutError("MU PDF parser already timed out before call");
}
@ -75,21 +85,60 @@ async function scrapePDFWithRunPodMU(
schema: z.object({
id: z.string(),
status: z.string(),
output: z.object({
markdown: z.string(),
}).optional(),
output: z
.object({
markdown: z.string(),
})
.optional(),
}),
mock: meta.mock,
abort,
});
//this is just so we can test in parallel and compare results
robustFetch({
url:
"https://api.runpod.ai/v2/" + process.env.RUNPOD_MUV2_POD_ID + "/runsync",
method: "POST",
headers: {
Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`,
},
body: {
input: {
file_content: base64Content,
filename: path.basename(tempFilePath) + ".pdf",
timeout,
created_at: Date.now(),
},
},
logger: meta.logger.child({
method: "scrapePDFWithRunPodMU/runsync/robustFetch",
}),
schema: z.object({
id: z.string(),
status: z.string(),
output: z
.object({
markdown: z.string(),
})
.optional(),
}),
mock: meta.mock,
abort,
}).catch(error => {
meta.logger.warn("Error scraping PDF with RunPod MU V2", {
error,
tempFilePath,
});
});
let status: string = podStart.status;
let result: { markdown: string } | undefined = podStart.output;
if (status === "IN_QUEUE" || status === "IN_PROGRESS") {
do {
abort?.throwIfAborted();
await new Promise(resolve => setTimeout(resolve, 2500));
await new Promise((resolve) => setTimeout(resolve, 2500));
abort?.throwIfAborted();
const podStatus = await robustFetch({
url: `https://api.runpod.ai/v2/${process.env.RUNPOD_MU_POD_ID}/status/${podStart.id}`,
@ -102,9 +151,11 @@ async function scrapePDFWithRunPodMU(
}),
schema: z.object({
status: z.string(),
output: z.object({
markdown: z.string(),
}).optional(),
output: z
.object({
markdown: z.string(),
})
.optional(),
}),
mock: meta.mock,
abort,
@ -159,14 +210,16 @@ export async function scrapePDF(
timeToRun: number | undefined,
): Promise<EngineScrapeResult> {
const startTime = Date.now();
if (!meta.options.parsePDF) {
if (meta.pdfPrefetch !== undefined && meta.pdfPrefetch !== null) {
const content = (await readFile(meta.pdfPrefetch.filePath)).toString("base64");
const content = (await readFile(meta.pdfPrefetch.filePath)).toString(
"base64",
);
return {
url: meta.pdfPrefetch.url ?? meta.url,
statusCode: meta.pdfPrefetch.status,
html: content,
markdown: content,
};
@ -174,40 +227,47 @@ export async function scrapePDF(
const file = await fetchFileToBuffer(meta.url, {
headers: meta.options.headers,
});
const ct = file.response.headers.get("Content-Type");
if (ct && !ct.includes("application/pdf")) { // if downloaded file wasn't a PDF
if (ct && !ct.includes("application/pdf")) {
// if downloaded file wasn't a PDF
throw new PDFAntibotError();
}
const content = file.buffer.toString("base64");
return {
url: file.response.url,
statusCode: file.response.status,
html: content,
markdown: content,
};
}
}
const { response, tempFilePath } = (meta.pdfPrefetch !== undefined && meta.pdfPrefetch !== null)
? { response: meta.pdfPrefetch, tempFilePath: meta.pdfPrefetch.filePath }
: await downloadFile(meta.id, meta.url, {
headers: meta.options.headers,
});
if ((response as any).headers) { // if downloadFile was used
const { response, tempFilePath } =
meta.pdfPrefetch !== undefined && meta.pdfPrefetch !== null
? { response: meta.pdfPrefetch, tempFilePath: meta.pdfPrefetch.filePath }
: await downloadFile(meta.id, meta.url, {
headers: meta.options.headers,
});
if ((response as any).headers) {
// if downloadFile was used
const r: Response = response as any;
const ct = r.headers.get("Content-Type");
if (ct && !ct.includes("application/pdf")) { // if downloaded file wasn't a PDF
if (ct && !ct.includes("application/pdf")) {
// if downloaded file wasn't a PDF
throw new PDFAntibotError();
}
}
const pageCount = await getPageCount(tempFilePath);
if (pageCount * MILLISECONDS_PER_PAGE > (timeToRun ?? Infinity)) {
throw new PDFInsufficientTimeError(pageCount, pageCount * MILLISECONDS_PER_PAGE + 5000);
throw new PDFInsufficientTimeError(
pageCount,
pageCount * MILLISECONDS_PER_PAGE + 5000,
);
}
let result: PDFProcessorResult | null = null;
@ -229,20 +289,26 @@ export async function scrapePDF(
}),
},
tempFilePath,
timeToRun ? (timeToRun - (Date.now() - startTime)) : undefined,
timeToRun ? timeToRun - (Date.now() - startTime) : undefined,
base64Content,
);
} catch (error) {
if (
error instanceof RemoveFeatureError
|| error instanceof TimeoutError
error instanceof RemoveFeatureError ||
error instanceof TimeoutError
) {
throw error;
} else if (
(error instanceof Error && error.name === "TimeoutError")
|| (error instanceof Error && error.message === "Request failed" && error.cause && error.cause instanceof Error && error.cause.name === "TimeoutError")
(error instanceof Error && error.name === "TimeoutError") ||
(error instanceof Error &&
error.message === "Request failed" &&
error.cause &&
error.cause instanceof Error &&
error.cause.name === "TimeoutError")
) {
throw new TimeoutError("PDF parsing timed out, please increase the timeout parameter in your scrape request");
throw new TimeoutError(
"PDF parsing timed out, please increase the timeout parameter in your scrape request",
);
}
meta.logger.warn(
"RunPod MU failed to parse PDF (could be due to timeout) -- falling back to parse-pdf",