From d82f44c93e89cdc54a97d45f15f634b7de355407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 16 Apr 2025 03:13:02 -0700 Subject: [PATCH] feat(extract): log failed extracts --- .../api/src/lib/extract/extraction-service.ts | 1622 +++++++++-------- 1 file changed, 832 insertions(+), 790 deletions(-) diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 0b72f379a..6f14306a2 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -36,7 +36,7 @@ import { getCachedDocs, saveCachedDocs } from "./helpers/cached-docs"; import { normalizeUrl } from "../canonical-url"; import { search } from "../../search"; import { buildRephraseToSerpPrompt } from "./build-prompts"; -import fs from "fs/promises"; + interface ExtractServiceOptions { request: ExtractRequest; teamId: string; @@ -109,845 +109,887 @@ export async function performExtraction( teamId, }); - // If no URLs are provided, generate URLs from the prompt - if ((!request.urls || request.urls.length === 0) && request.prompt) { - logger.debug("Generating URLs from prompt...", { - prompt: request.prompt, - }); - const rephrasedPrompt = await generateBasicCompletion( - buildRephraseToSerpPrompt(request.prompt), - ); - let rptxt = rephrasedPrompt?.text.replace('"', "").replace("'", "") || ""; - if (rephrasedPrompt) { - costTracking.otherCallCount++; - costTracking.otherCost += rephrasedPrompt.cost; - costTracking.totalCost += rephrasedPrompt.cost; - } - const searchResults = await search({ - query: rptxt, - num_results: 10, - }); + try { - request.urls = searchResults.map((result) => result.url) as string[]; - } - if (request.urls && request.urls.length === 0) { - logger.error("No search results found", { - query: request.prompt, - }); - return { - success: false, - error: "No search results found", - extractId, - }; - } - - const urls = request.urls || ([] as string[]); - - if ( - request.__experimental_cacheMode == "load" && - request.__experimental_cacheKey && - urls - ) { - logger.debug("Loading cached docs..."); - try { - const cache = await getCachedDocs(urls, request.__experimental_cacheKey); - for (const doc of cache) { - if (doc.metadata.url) { - docsMap.set(normalizeUrl(doc.metadata.url), doc); - } - } - } catch (error) { - logger.error("Error loading cached docs", { error }); - } - } - - // Token tracking - let tokenUsage: TokenUsage[] = []; - - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.INITIAL, - startedAt: Date.now(), - finishedAt: Date.now(), - discoveredLinks: request.urls, - }, - ], - }); - - let reqSchema = request.schema; - if (!reqSchema && request.prompt) { - const schemaGenRes = await generateSchemaFromPrompt(request.prompt, logger); - reqSchema = schemaGenRes.extract; - costTracking.otherCallCount++; - costTracking.otherCost += schemaGenRes.cost; - costTracking.totalCost += schemaGenRes.cost; - - logger.debug("Generated request schema.", { - originalSchema: request.schema, - schema: reqSchema, - }); - } - - if (reqSchema) { - reqSchema = await dereferenceSchema(reqSchema); - } - - logger.debug("Transformed schema.", { - originalSchema: request.schema, - schema: reqSchema, - }); - - let rSchema = reqSchema; - - // agent evaluates if the schema or the prompt has an array with big amount of items - // also it checks if the schema any other properties that are not arrays - // if so, it splits the results into 2 types of completions: - // 1. the first one is a completion that will extract the array of items - // 2. the second one is multiple completions that will extract the items from the array - let startAnalyze = Date.now(); - const { - isMultiEntity, - multiEntityKeys, - reasoning, - keyIndicators, - tokenUsage: schemaAnalysisTokenUsage, - cost: schemaAnalysisCost, - } = await analyzeSchemaAndPrompt(urls, reqSchema, request.prompt ?? "", logger); - - logger.debug("Analyzed schema.", { - isMultiEntity, - multiEntityKeys, - reasoning, - keyIndicators, - }); - - costTracking.otherCallCount++; - costTracking.otherCost += schemaAnalysisCost; - costTracking.totalCost += schemaAnalysisCost; - - // Track schema analysis tokens - tokenUsage.push(schemaAnalysisTokenUsage); - - let startMap = Date.now(); - let aggMapLinks: string[] = []; - logger.debug("Processing URLs...", { - urlCount: request.urls?.length || 0, - }); - - const urlPromises = urls.map((url) => - processUrl( - { - url, + // If no URLs are provided, generate URLs from the prompt + if ((!request.urls || request.urls.length === 0) && request.prompt) { + logger.debug("Generating URLs from prompt...", { prompt: request.prompt, - teamId, - allowExternalLinks: request.allowExternalLinks, - origin: request.origin, - limit: request.limit, - includeSubdomains: request.includeSubdomains, - schema: request.schema, - log, - isMultiEntity, - reasoning, - multiEntityKeys, - keyIndicators, - }, - urlTraces, - (links: string[]) => { - aggMapLinks.push(...links); - updateExtract(extractId, { - steps: [ - { - step: ExtractStep.MAP, - startedAt: startMap, - finishedAt: Date.now(), - discoveredLinks: aggMapLinks, - }, - ], - }); - }, - logger.child({ module: "extract", method: "processUrl", url }), - costTracking, - ), - ); - - const processedUrls = await Promise.all(urlPromises); - let links = processedUrls.flat().filter((url) => url); - logger.debug("Processed URLs.", { - linkCount: links.length, - }); - - if (links.length === 0) { - links = urls.map(x => x.replace(/\*$/g, "")); - logger.warn("0 links! Doing just the original URLs. (without * wildcard)", { - linkCount: links.length, - }); - } - - log["links"] = links; - log["linksLength"] = links.length; - - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.MAP_RERANK, - startedAt: startMap, - finishedAt: Date.now(), - discoveredLinks: links, - }, - ], - }); - - if (isMultiEntity && reqSchema) { - log["isMultiEntity"] = true; - logger.debug("=== MULTI-ENTITY ==="); - - const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas( - reqSchema, - multiEntityKeys, - ); - rSchema = singleAnswerSchema; - logger.debug("Spread schemas.", { singleAnswerSchema, multiEntitySchema }); - - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.MULTI_ENTITY, - startedAt: startAnalyze, - finishedAt: Date.now(), - discoveredLinks: [], - }, - ], - }); - - const timeout = 60000; - - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.MULTI_ENTITY_SCRAPE, - startedAt: startAnalyze, - finishedAt: Date.now(), - discoveredLinks: links, - }, - ], - }); - - logger.debug("Starting multi-entity scrape..."); - let startScrape = Date.now(); - log["docsSizeBeforeMultiEntityScrape"] = docsMap.size; - - const scrapePromises = links.map((url) => { - if (!docsMap.has(normalizeUrl(url))) { - return scrapeDocument( - { - url, - teamId, - origin: request.origin || "api", - timeout, - }, - urlTraces, - logger.child({ - module: "extract", - method: "scrapeDocument", - url, - isMultiEntity: true, - }), - { - ...request.scrapeOptions, - - // Needs to be true for multi-entity to work properly - onlyMainContent: true, - }, - ); - } - return docsMap.get(normalizeUrl(url)); - }); - - let multyEntityDocs = (await Promise.all(scrapePromises)).filter( - (doc): doc is Document => doc !== null, - ); - - log["docsSizeAfterMultiEntityScrape"] = scrapePromises.length; - - logger.debug("Multi-entity scrape finished.", { - docCount: multyEntityDocs.length, - }); - - totalUrlsScraped += multyEntityDocs.length; - - let endScrape = Date.now(); - - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.MULTI_ENTITY_SCRAPE, - startedAt: startScrape, - finishedAt: endScrape, - discoveredLinks: links, - }, - ], - }); - - for (const doc of multyEntityDocs) { - if (doc?.metadata?.url) { - docsMap.set(normalizeUrl(doc.metadata.url), doc); - } - } - - logger.debug("Updated docsMap.", { docsMapSize: docsMap.size }); // useful for error probing - - // Process docs in chunks with queue style processing - const chunkSize = 50; - const timeoutCompletion = 45000; // 45 second timeout - const chunks: Document[][] = []; - const extractionResults: { extract: any; url: string }[] = []; - - // Split into chunks - for (let i = 0; i < multyEntityDocs.length; i += chunkSize) { - chunks.push(multyEntityDocs.slice(i, i + chunkSize)); - } - - const sessionIds = chunks.map(() => 'fc-' + crypto.randomUUID()); - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.MULTI_ENTITY_AGENT_SCRAPE, - startedAt: Date.now(), - finishedAt: null - }, - ], - sessionIds - }); - - // Process chunks sequentially with timeout - for (let i = 0; i < chunks.length; i++) { - const chunk = chunks[i]; - const sessionId = sessionIds[i]; - const chunkPromises = chunk.map(async (doc) => { - try { - ajv.compile(multiEntitySchema); - - // Wrap in timeout promise - const timeoutPromise = new Promise((resolve) => { - setTimeout(() => resolve(null), timeoutCompletion); - }); - - const completionPromise = batchExtractPromise({ - multiEntitySchema, - links, - prompt: request.prompt ?? "", - systemPrompt: request.systemPrompt ?? "", - doc, - useAgent: isAgentExtractModelValid(request.agent?.model), - extractId, - sessionId - }, logger); - - // Race between timeout and completion - const multiEntityCompletion = (await completionPromise) as Awaited< - ReturnType - >; - - // TODO: merge multiEntityCompletion.extract to fit the multiEntitySchema - - // Track multi-entity extraction tokens - if (multiEntityCompletion) { - tokenUsage.push(multiEntityCompletion.totalUsage); - - costTracking.smartScrapeCallCount += multiEntityCompletion.smartScrapeCallCount; - costTracking.smartScrapeCost += multiEntityCompletion.smartScrapeCost; - costTracking.otherCallCount += multiEntityCompletion.otherCallCount; - costTracking.otherCost += multiEntityCompletion.otherCost; - costTracking.totalCost += multiEntityCompletion.smartScrapeCost + multiEntityCompletion.otherCost; - - if (multiEntityCompletion.extract) { - return { - extract: multiEntityCompletion.extract, - url: doc.metadata.url || doc.metadata.sourceURL || "", - }; - } - } - - // console.log(multiEntityCompletion.extract) - // if (!multiEntityCompletion.extract?.is_content_relevant) { - // console.log(`Skipping extraction for ${doc.metadata.url} as content is not relevant`); - // return null; - // } - - // Update token usage in traces - // if (multiEntityCompletion && multiEntityCompletion.numTokens) { - // const totalLength = docs.reduce( - // (sum, doc) => sum + (doc.markdown?.length || 0), - // 0, - // ); - // docs.forEach((doc) => { - // if (doc.metadata?.sourceURL) { - // const trace = urlTraces.find( - // (t) => t.url === doc.metadata.sourceURL, - // ); - // if (trace && trace.contentStats) { - // trace.contentStats.tokensUsed = Math.floor( - // ((doc.markdown?.length || 0) / totalLength) * - // (multiEntityCompletion?.numTokens || 0), - // ); - // } - // } - // }); - // } - - // if (multiEntityCompletion.extract && multiEntityCompletion.extract.extraction_confidence < 3) { - // console.log(`Skipping extraction for ${doc.metadata.url} as confidence is too low (${multiEntityCompletion.extract.extraction_confidence})`); - // return null; - // } - - return null; - } catch (error) { - logger.error(`Failed to process document.`, { - error, - url: doc.metadata.url ?? doc.metadata.sourceURL!, - }); - return null; - } }); - // Wait for current chunk to complete before processing next chunk - const chunkResults = await Promise.all(chunkPromises); - const validResults = chunkResults.filter( - (result): result is { extract: any; url: string } => result !== null, + const rephrasedPrompt = await generateBasicCompletion( + buildRephraseToSerpPrompt(request.prompt), ); - extractionResults.push(...validResults); - // Merge all extracts from valid results into a single array - const extractArrays = validResults.map((r) => - Array.isArray(r.extract) ? r.extract : [r.extract], - ); - const mergedExtracts = extractArrays.flat(); - multiEntityCompletions.push(...mergedExtracts); - multiEntityCompletions = multiEntityCompletions.filter((c) => c !== null); - logger.debug("All multi-entity completion chunks finished.", { - completionCount: multiEntityCompletions.length, + let rptxt = rephrasedPrompt?.text.replace('"', "").replace("'", "") || ""; + if (rephrasedPrompt) { + costTracking.otherCallCount++; + costTracking.otherCost += rephrasedPrompt.cost; + costTracking.totalCost += rephrasedPrompt.cost; + } + const searchResults = await search({ + query: rptxt, + num_results: 10, }); - log["multiEntityCompletionsLength"] = multiEntityCompletions.length; + + request.urls = searchResults.map((result) => result.url) as string[]; } - - try { - // Use SourceTracker to handle source tracking - const sourceTracker = new SourceTracker(); - logger.debug("Created SourceTracker instance"); - - // Transform and merge results while preserving sources - try { - sourceTracker.transformResults( - extractionResults, - multiEntitySchema, - false, - ); - logger.debug("Successfully transformed results with sourceTracker"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in sourceTracker.transformResults: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in sourceTracker.transformResults:`, { error }); - throw error; - } - - try { - multiEntityResult = transformArrayToObject( - multiEntitySchema, - multiEntityCompletions, - ); - logger.debug("Successfully transformed array to object"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in transformArrayToObject: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in transformArrayToObject:`, { error }); - throw error; - } - - // Track sources before deduplication - try { - sourceTracker.trackPreDeduplicationSources(multiEntityResult); - logger.debug("Successfully tracked pre-deduplication sources"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in trackPreDeduplicationSources: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in trackPreDeduplicationSources:`, { error }); - throw error; - } - - // Apply deduplication and merge - try { - multiEntityResult = deduplicateObjectsArray(multiEntityResult); - logger.debug("Successfully deduplicated objects array"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in deduplicateObjectsArray: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in deduplicateObjectsArray:`, { error }); - throw error; - } - - try { - multiEntityResult = mergeNullValObjs(multiEntityResult); - logger.debug("Successfully merged null value objects"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in mergeNullValObjs: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in mergeNullValObjs:`, { error }); - throw error; - } - - // Map sources to final deduplicated/merged items - try { - const multiEntitySources = sourceTracker.mapSourcesToFinalItems( - multiEntityResult, - multiEntityKeys, - ); - Object.assign(sources, multiEntitySources); - logger.debug("Successfully mapped sources to final items"); - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Error in mapSourcesToFinalItems: ${JSON.stringify(error, null, 2)}\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Error in mapSourcesToFinalItems:`, { error }); - throw error; - } - } catch (error) { - const errorLog = `[${new Date().toISOString()}] Failed to transform array to object\nError: ${JSON.stringify(error, null, 2)}\nStack: ${error.stack}\nMultiEntityResult: ${JSON.stringify(multiEntityResult, null, 2)}\nMultiEntityCompletions: ${JSON.stringify(multiEntityCompletions, null, 2)}\nMultiEntitySchema: ${JSON.stringify(multiEntitySchema, null, 2)}\n\n`; - await fs.appendFile('logs/extraction-errors.log', errorLog); - logger.error(`Failed to transform array to object`, { - error, - errorMessage: error.message, - errorStack: error.stack, - multiEntityResult: JSON.stringify(multiEntityResult), - multiEntityCompletions: JSON.stringify(multiEntityCompletions), - multiEntitySchema: JSON.stringify(multiEntitySchema) + if (request.urls && request.urls.length === 0) { + logger.error("No search results found", { + query: request.prompt, + }); + logJob({ + job_id: extractId, + success: false, + message: "No search results found", + num_docs: 1, + docs: [], + time_taken: (new Date().getTime() - Date.now()) / 1000, + team_id: teamId, + mode: "extract", + url: request.urls?.join(", ") || "", + scrapeOptions: request, + origin: request.origin ?? "api", + num_tokens: 0, + tokens_billed: 0, + sources, + cost_tracking: costTracking, }); return { success: false, - error: - "An unexpected error occurred. Please contact help@firecrawl.com for help.", + error: "No search results found", extractId, - urlTrace: urlTraces, - totalUrlsScraped, }; } - } - if ( - rSchema && - Object.keys(rSchema).length > 0 && - rSchema.properties && - Object.keys(rSchema.properties).length > 0 - ) { - log["isSingleEntity"] = true; - logger.debug("=== SINGLE PAGES ===", { - linkCount: links.length, - schema: rSchema, - }); - // Scrape documents - const timeout = 60000; - let singleAnswerDocs: Document[] = []; + const urls = request.urls || ([] as string[]); - // let rerank = await rerankLinks(links.map((url) => ({ url })), request.prompt ?? JSON.stringify(request.schema), urlTraces); + if ( + request.__experimental_cacheMode == "load" && + request.__experimental_cacheKey && + urls + ) { + logger.debug("Loading cached docs..."); + try { + const cache = await getCachedDocs(urls, request.__experimental_cacheKey); + for (const doc of cache) { + if (doc.metadata.url) { + docsMap.set(normalizeUrl(doc.metadata.url), doc); + } + } + } catch (error) { + logger.error("Error loading cached docs", { error }); + } + } + + // Token tracking + let tokenUsage: TokenUsage[] = []; await updateExtract(extractId, { status: "processing", steps: [ { - step: ExtractStep.SCRAPE, + step: ExtractStep.INITIAL, startedAt: Date.now(), finishedAt: Date.now(), + discoveredLinks: request.urls, + }, + ], + }); + + let reqSchema = request.schema; + if (!reqSchema && request.prompt) { + const schemaGenRes = await generateSchemaFromPrompt(request.prompt, logger); + reqSchema = schemaGenRes.extract; + costTracking.otherCallCount++; + costTracking.otherCost += schemaGenRes.cost; + costTracking.totalCost += schemaGenRes.cost; + + logger.debug("Generated request schema.", { + originalSchema: request.schema, + schema: reqSchema, + }); + } + + if (reqSchema) { + reqSchema = await dereferenceSchema(reqSchema); + } + + logger.debug("Transformed schema.", { + originalSchema: request.schema, + schema: reqSchema, + }); + + let rSchema = reqSchema; + + // agent evaluates if the schema or the prompt has an array with big amount of items + // also it checks if the schema any other properties that are not arrays + // if so, it splits the results into 2 types of completions: + // 1. the first one is a completion that will extract the array of items + // 2. the second one is multiple completions that will extract the items from the array + let startAnalyze = Date.now(); + const { + isMultiEntity, + multiEntityKeys, + reasoning, + keyIndicators, + tokenUsage: schemaAnalysisTokenUsage, + cost: schemaAnalysisCost, + } = await analyzeSchemaAndPrompt(urls, reqSchema, request.prompt ?? "", logger); + + logger.debug("Analyzed schema.", { + isMultiEntity, + multiEntityKeys, + reasoning, + keyIndicators, + }); + + costTracking.otherCallCount++; + costTracking.otherCost += schemaAnalysisCost; + costTracking.totalCost += schemaAnalysisCost; + + // Track schema analysis tokens + tokenUsage.push(schemaAnalysisTokenUsage); + + let startMap = Date.now(); + let aggMapLinks: string[] = []; + logger.debug("Processing URLs...", { + urlCount: request.urls?.length || 0, + }); + + const urlPromises = urls.map((url) => + processUrl( + { + url, + prompt: request.prompt, + teamId, + allowExternalLinks: request.allowExternalLinks, + origin: request.origin, + limit: request.limit, + includeSubdomains: request.includeSubdomains, + schema: request.schema, + log, + isMultiEntity, + reasoning, + multiEntityKeys, + keyIndicators, + }, + urlTraces, + (links: string[]) => { + aggMapLinks.push(...links); + updateExtract(extractId, { + steps: [ + { + step: ExtractStep.MAP, + startedAt: startMap, + finishedAt: Date.now(), + discoveredLinks: aggMapLinks, + }, + ], + }); + }, + logger.child({ module: "extract", method: "processUrl", url }), + costTracking, + ), + ); + + const processedUrls = await Promise.all(urlPromises); + let links = processedUrls.flat().filter((url) => url); + logger.debug("Processed URLs.", { + linkCount: links.length, + }); + + if (links.length === 0) { + links = urls.map(x => x.replace(/\*$/g, "")); + logger.warn("0 links! Doing just the original URLs. (without * wildcard)", { + linkCount: links.length, + }); + } + + log["links"] = links; + log["linksLength"] = links.length; + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MAP_RERANK, + startedAt: startMap, + finishedAt: Date.now(), discoveredLinks: links, }, ], }); - log["docsSizeBeforeSingleEntityScrape"] = docsMap.size; - const scrapePromises = links.map((url) => { - if (!docsMap.has(normalizeUrl(url))) { - return scrapeDocument( + + if (isMultiEntity && reqSchema) { + log["isMultiEntity"] = true; + logger.debug("=== MULTI-ENTITY ==="); + + const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas( + reqSchema, + multiEntityKeys, + ); + rSchema = singleAnswerSchema; + logger.debug("Spread schemas.", { singleAnswerSchema, multiEntitySchema }); + + await updateExtract(extractId, { + status: "processing", + steps: [ { - url, - teamId, - origin: request.origin || "api", - timeout, + step: ExtractStep.MULTI_ENTITY, + startedAt: startAnalyze, + finishedAt: Date.now(), + discoveredLinks: [], }, - urlTraces, - logger.child({ - module: "extract", - method: "scrapeDocument", - url, - isMultiEntity: false, - }), - request.scrapeOptions, - ); - } - return docsMap.get(normalizeUrl(url)); - }); + ], + }); - try { - const results = await Promise.all(scrapePromises); - log["docsSizeAfterSingleEntityScrape"] = docsMap.size; + const timeout = 60000; - for (const doc of results) { + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_SCRAPE, + startedAt: startAnalyze, + finishedAt: Date.now(), + discoveredLinks: links, + }, + ], + }); + + logger.debug("Starting multi-entity scrape..."); + let startScrape = Date.now(); + log["docsSizeBeforeMultiEntityScrape"] = docsMap.size; + + const scrapePromises = links.map((url) => { + if (!docsMap.has(normalizeUrl(url))) { + return scrapeDocument( + { + url, + teamId, + origin: request.origin || "api", + timeout, + }, + urlTraces, + logger.child({ + module: "extract", + method: "scrapeDocument", + url, + isMultiEntity: true, + }), + { + ...request.scrapeOptions, + + // Needs to be true for multi-entity to work properly + onlyMainContent: true, + }, + ); + } + return docsMap.get(normalizeUrl(url)); + }); + + let multyEntityDocs = (await Promise.all(scrapePromises)).filter( + (doc): doc is Document => doc !== null, + ); + + log["docsSizeAfterMultiEntityScrape"] = scrapePromises.length; + + logger.debug("Multi-entity scrape finished.", { + docCount: multyEntityDocs.length, + }); + + totalUrlsScraped += multyEntityDocs.length; + + let endScrape = Date.now(); + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_SCRAPE, + startedAt: startScrape, + finishedAt: endScrape, + discoveredLinks: links, + }, + ], + }); + + for (const doc of multyEntityDocs) { if (doc?.metadata?.url) { docsMap.set(normalizeUrl(doc.metadata.url), doc); } } + logger.debug("Updated docsMap.", { docsMapSize: docsMap.size }); // useful for error probing - const validResults = results.filter( - (doc): doc is Document => doc !== null, - ); - singleAnswerDocs.push(...validResults); - totalUrlsScraped += validResults.length; + // Process docs in chunks with queue style processing + const chunkSize = 50; + const timeoutCompletion = 45000; // 45 second timeout + const chunks: Document[][] = []; + const extractionResults: { extract: any; url: string }[] = []; - logger.debug("Scrapes finished.", { docCount: validResults.length }); - } catch (error) { - return { - success: false, - error: error.message, - extractId, - urlTrace: urlTraces, - totalUrlsScraped, - }; - } + // Split into chunks + for (let i = 0; i < multyEntityDocs.length; i += chunkSize) { + chunks.push(multyEntityDocs.slice(i, i + chunkSize)); + } - if (docsMap.size == 0) { - // All urls are invalid - logger.error("All provided URLs are invalid!"); - return { - success: false, - error: - "All provided URLs are invalid. Please check your input and try again.", - extractId, - urlTrace: request.urlTrace ? urlTraces : undefined, - totalUrlsScraped: 0, - }; - } + const sessionIds = chunks.map(() => 'fc-' + crypto.randomUUID()); + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_AGENT_SCRAPE, + startedAt: Date.now(), + finishedAt: null + }, + ], + sessionIds + }); - await updateExtract(extractId, { - status: "processing", - steps: [ - { - step: ExtractStep.EXTRACT, - startedAt: Date.now(), - finishedAt: Date.now(), - discoveredLinks: links, - }, - ], - }); + // Process chunks sequentially with timeout + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; + const sessionId = sessionIds[i]; + const chunkPromises = chunk.map(async (doc) => { + try { + ajv.compile(multiEntitySchema); - // Generate completions - logger.debug("Generating singleAnswer completions..."); - log["singleAnswerDocsLength"] = singleAnswerDocs.length; - let { - extract: completionResult, - tokenUsage: singleAnswerTokenUsage, - sources: singleAnswerSources, - smartScrapeCost: singleAnswerSmartScrapeCost, - otherCost: singleAnswerOtherCost, - smartScrapeCallCount: singleAnswerSmartScrapeCallCount, - otherCallCount: singleAnswerOtherCallCount, - } = await singleAnswerCompletion({ - singleAnswerDocs, - rSchema, - links, - prompt: request.prompt ?? "", - systemPrompt: request.systemPrompt ?? "", - useAgent: isAgentExtractModelValid(request.agent?.model), - extractId, - }); - costTracking.smartScrapeCost += singleAnswerSmartScrapeCost; - costTracking.smartScrapeCallCount += singleAnswerSmartScrapeCallCount; - costTracking.otherCost += singleAnswerOtherCost; - costTracking.otherCallCount += singleAnswerOtherCallCount; - costTracking.totalCost += singleAnswerSmartScrapeCost + singleAnswerOtherCost; - logger.debug("Done generating singleAnswer completions."); + // Wrap in timeout promise + const timeoutPromise = new Promise((resolve) => { + setTimeout(() => resolve(null), timeoutCompletion); + }); - singleAnswerResult = transformArrayToObject(rSchema, completionResult); + const completionPromise = batchExtractPromise({ + multiEntitySchema, + links, + prompt: request.prompt ?? "", + systemPrompt: request.systemPrompt ?? "", + doc, + useAgent: isAgentExtractModelValid(request.agent?.model), + extractId, + sessionId + }, logger); - singleAnswerResult = deduplicateObjectsArray(singleAnswerResult); - // Track single answer extraction tokens and sources - if (completionResult) { - tokenUsage.push(singleAnswerTokenUsage); + // Race between timeout and completion + const multiEntityCompletion = (await completionPromise) as Awaited< + ReturnType + >; - // Add sources for top-level properties in single answer - if (rSchema?.properties) { - Object.keys(rSchema.properties).forEach((key) => { - if (completionResult[key] !== undefined) { - sources[key] = - singleAnswerSources || - singleAnswerDocs.map( - (doc) => doc.metadata.url || doc.metadata.sourceURL || "", - ); + // TODO: merge multiEntityCompletion.extract to fit the multiEntitySchema + + // Track multi-entity extraction tokens + if (multiEntityCompletion) { + tokenUsage.push(multiEntityCompletion.totalUsage); + + costTracking.smartScrapeCallCount += multiEntityCompletion.smartScrapeCallCount; + costTracking.smartScrapeCost += multiEntityCompletion.smartScrapeCost; + costTracking.otherCallCount += multiEntityCompletion.otherCallCount; + costTracking.otherCost += multiEntityCompletion.otherCost; + costTracking.totalCost += multiEntityCompletion.smartScrapeCost + multiEntityCompletion.otherCost; + + if (multiEntityCompletion.extract) { + return { + extract: multiEntityCompletion.extract, + url: doc.metadata.url || doc.metadata.sourceURL || "", + }; + } + } + + // console.log(multiEntityCompletion.extract) + // if (!multiEntityCompletion.extract?.is_content_relevant) { + // console.log(`Skipping extraction for ${doc.metadata.url} as content is not relevant`); + // return null; + // } + + // Update token usage in traces + // if (multiEntityCompletion && multiEntityCompletion.numTokens) { + // const totalLength = docs.reduce( + // (sum, doc) => sum + (doc.markdown?.length || 0), + // 0, + // ); + // docs.forEach((doc) => { + // if (doc.metadata?.sourceURL) { + // const trace = urlTraces.find( + // (t) => t.url === doc.metadata.sourceURL, + // ); + // if (trace && trace.contentStats) { + // trace.contentStats.tokensUsed = Math.floor( + // ((doc.markdown?.length || 0) / totalLength) * + // (multiEntityCompletion?.numTokens || 0), + // ); + // } + // } + // }); + // } + + // if (multiEntityCompletion.extract && multiEntityCompletion.extract.extraction_confidence < 3) { + // console.log(`Skipping extraction for ${doc.metadata.url} as confidence is too low (${multiEntityCompletion.extract.extraction_confidence})`); + // return null; + // } + + return null; + } catch (error) { + logger.error(`Failed to process document.`, { + error, + url: doc.metadata.url ?? doc.metadata.sourceURL!, + }); + return null; } }); + // Wait for current chunk to complete before processing next chunk + const chunkResults = await Promise.all(chunkPromises); + const validResults = chunkResults.filter( + (result): result is { extract: any; url: string } => result !== null, + ); + extractionResults.push(...validResults); + // Merge all extracts from valid results into a single array + const extractArrays = validResults.map((r) => + Array.isArray(r.extract) ? r.extract : [r.extract], + ); + const mergedExtracts = extractArrays.flat(); + multiEntityCompletions.push(...mergedExtracts); + multiEntityCompletions = multiEntityCompletions.filter((c) => c !== null); + logger.debug("All multi-entity completion chunks finished.", { + completionCount: multiEntityCompletions.length, + }); + log["multiEntityCompletionsLength"] = multiEntityCompletions.length; + } + + try { + // Use SourceTracker to handle source tracking + const sourceTracker = new SourceTracker(); + logger.debug("Created SourceTracker instance"); + + // Transform and merge results while preserving sources + try { + sourceTracker.transformResults( + extractionResults, + multiEntitySchema, + false, + ); + logger.debug("Successfully transformed results with sourceTracker"); + } catch (error) { + logger.error(`Error in sourceTracker.transformResults:`, { error }); + throw error; + } + + try { + multiEntityResult = transformArrayToObject( + multiEntitySchema, + multiEntityCompletions, + ); + logger.debug("Successfully transformed array to object"); + } catch (error) { + logger.error(`Error in transformArrayToObject:`, { error }); + throw error; + } + + // Track sources before deduplication + try { + sourceTracker.trackPreDeduplicationSources(multiEntityResult); + logger.debug("Successfully tracked pre-deduplication sources"); + } catch (error) { + logger.error(`Error in trackPreDeduplicationSources:`, { error }); + throw error; + } + + // Apply deduplication and merge + try { + multiEntityResult = deduplicateObjectsArray(multiEntityResult); + logger.debug("Successfully deduplicated objects array"); + } catch (error) { + logger.error(`Error in deduplicateObjectsArray:`, { error }); + throw error; + } + + try { + multiEntityResult = mergeNullValObjs(multiEntityResult); + logger.debug("Successfully merged null value objects"); + } catch (error) { + logger.error(`Error in mergeNullValObjs:`, { error }); + throw error; + } + + // Map sources to final deduplicated/merged items + try { + const multiEntitySources = sourceTracker.mapSourcesToFinalItems( + multiEntityResult, + multiEntityKeys, + ); + Object.assign(sources, multiEntitySources); + logger.debug("Successfully mapped sources to final items"); + } catch (error) { + logger.error(`Error in mapSourcesToFinalItems:`, { error }); + throw error; + } + } catch (error) { + logger.error(`Failed to transform array to object`, { + error, + errorMessage: error.message, + errorStack: error.stack, + multiEntityResult: JSON.stringify(multiEntityResult), + multiEntityCompletions: JSON.stringify(multiEntityCompletions), + multiEntitySchema: JSON.stringify(multiEntitySchema) + }); + logJob({ + job_id: extractId, + success: false, + message: (error instanceof Error ? error.message : "Failed to transform array to object"), + num_docs: 1, + docs: [], + time_taken: (new Date().getTime() - Date.now()) / 1000, + team_id: teamId, + mode: "extract", + url: request.urls?.join(", ") || "", + scrapeOptions: request, + origin: request.origin ?? "api", + num_tokens: 0, + tokens_billed: 0, + sources, + cost_tracking: costTracking, + }); + return { + success: false, + error: + "An unexpected error occurred. Please contact help@firecrawl.com for help.", + extractId, + urlTrace: urlTraces, + totalUrlsScraped, + }; + } + } + if ( + rSchema && + Object.keys(rSchema).length > 0 && + rSchema.properties && + Object.keys(rSchema.properties).length > 0 + ) { + log["isSingleEntity"] = true; + logger.debug("=== SINGLE PAGES ===", { + linkCount: links.length, + schema: rSchema, + }); + + // Scrape documents + const timeout = 60000; + let singleAnswerDocs: Document[] = []; + + // let rerank = await rerankLinks(links.map((url) => ({ url })), request.prompt ?? JSON.stringify(request.schema), urlTraces); + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.SCRAPE, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: links, + }, + ], + }); + log["docsSizeBeforeSingleEntityScrape"] = docsMap.size; + const scrapePromises = links.map((url) => { + if (!docsMap.has(normalizeUrl(url))) { + return scrapeDocument( + { + url, + teamId, + origin: request.origin || "api", + timeout, + }, + urlTraces, + logger.child({ + module: "extract", + method: "scrapeDocument", + url, + isMultiEntity: false, + }), + request.scrapeOptions, + ); + } + return docsMap.get(normalizeUrl(url)); + }); + + try { + const results = await Promise.all(scrapePromises); + log["docsSizeAfterSingleEntityScrape"] = docsMap.size; + + for (const doc of results) { + if (doc?.metadata?.url) { + docsMap.set(normalizeUrl(doc.metadata.url), doc); + } + } + logger.debug("Updated docsMap.", { docsMapSize: docsMap.size }); // useful for error probing + + const validResults = results.filter( + (doc): doc is Document => doc !== null, + ); + singleAnswerDocs.push(...validResults); + totalUrlsScraped += validResults.length; + + logger.debug("Scrapes finished.", { docCount: validResults.length }); + } catch (error) { + return { + success: false, + error: error.message, + extractId, + urlTrace: urlTraces, + totalUrlsScraped, + }; + } + + if (docsMap.size == 0) { + // All urls are invalid + logger.error("All provided URLs are invalid!"); + return { + success: false, + error: + "All provided URLs are invalid. Please check your input and try again.", + extractId, + urlTrace: request.urlTrace ? urlTraces : undefined, + totalUrlsScraped: 0, + }; + } + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.EXTRACT, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: links, + }, + ], + }); + + // Generate completions + logger.debug("Generating singleAnswer completions..."); + log["singleAnswerDocsLength"] = singleAnswerDocs.length; + let { + extract: completionResult, + tokenUsage: singleAnswerTokenUsage, + sources: singleAnswerSources, + smartScrapeCost: singleAnswerSmartScrapeCost, + otherCost: singleAnswerOtherCost, + smartScrapeCallCount: singleAnswerSmartScrapeCallCount, + otherCallCount: singleAnswerOtherCallCount, + } = await singleAnswerCompletion({ + singleAnswerDocs, + rSchema, + links, + prompt: request.prompt ?? "", + systemPrompt: request.systemPrompt ?? "", + useAgent: isAgentExtractModelValid(request.agent?.model), + extractId, + }); + costTracking.smartScrapeCost += singleAnswerSmartScrapeCost; + costTracking.smartScrapeCallCount += singleAnswerSmartScrapeCallCount; + costTracking.otherCost += singleAnswerOtherCost; + costTracking.otherCallCount += singleAnswerOtherCallCount; + costTracking.totalCost += singleAnswerSmartScrapeCost + singleAnswerOtherCost; + logger.debug("Done generating singleAnswer completions."); + + singleAnswerResult = transformArrayToObject(rSchema, completionResult); + + singleAnswerResult = deduplicateObjectsArray(singleAnswerResult); + // Track single answer extraction tokens and sources + if (completionResult) { + tokenUsage.push(singleAnswerTokenUsage); + + // Add sources for top-level properties in single answer + if (rSchema?.properties) { + Object.keys(rSchema.properties).forEach((key) => { + if (completionResult[key] !== undefined) { + sources[key] = + singleAnswerSources || + singleAnswerDocs.map( + (doc) => doc.metadata.url || doc.metadata.sourceURL || "", + ); + } + }); + } + } + + // singleAnswerResult = completionResult; + // singleAnswerCompletions = singleAnswerResult; + + // Update token usage in traces + // if (completions && completions.numTokens) { + // const totalLength = docs.reduce( + // (sum, doc) => sum + (doc.markdown?.length || 0), + // 0, + // ); + // docs.forEach((doc) => { + // if (doc.metadata?.sourceURL) { + // const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL); + // if (trace && trace.contentStats) { + // trace.contentStats.tokensUsed = Math.floor( + // ((doc.markdown?.length || 0) / totalLength) * + // (completions?.numTokens || 0), + // ); + // } + // } + // }); + // } + } + + log["singleAnswerResult"] = singleAnswerResult; + log["multiEntityResult"] = multiEntityResult; + + let finalResult = reqSchema + ? await mixSchemaObjects( + reqSchema, + singleAnswerResult, + multiEntityResult, + logger.child({ method: "mixSchemaObjects" }), + ) + : singleAnswerResult || multiEntityResult; + + // Tokenize final result to get token count + // let finalResultTokens = 0; + // if (finalResult) { + // const finalResultStr = JSON.stringify(finalResult); + // finalResultTokens = numTokensFromString(finalResultStr, "gpt-4o"); + + // } + // // Deduplicate and validate final result against schema + // if (reqSchema && finalResult && finalResult.length <= extractConfig.DEDUPLICATION.MAX_TOKENS) { + // const schemaValidation = await generateCompletions( + // logger.child({ method: "extractService/validateAndDeduplicate" }), + // { + // mode: "llm", + // systemPrompt: `You are a data validator and deduplicator. Your task is to: + // 1. Remove any duplicate entries in the data extracted by merging that into a single object according to the provided shcema + // 2. Ensure all data matches the provided schema + // 3. Keep only the highest quality and most complete entries when duplicates are found. + + // Do not change anything else. If data is null keep it null. If the schema is not provided, return the data as is.`, + // prompt: `Please validate and merge the duplicate entries in this data according to the schema provided:\n + + // + + // ${JSON.stringify(finalResult)} + + // + + // + + // ${JSON.stringify(reqSchema)} + + // + // `, + // schema: reqSchema, + // }, + // undefined, + // undefined, + // true, + // "gpt-4o" + // ); + // console.log("schemaValidation", schemaValidation); + + // console.log("schemaValidation", finalResult); + + // if (schemaValidation?.extract) { + // tokenUsage.push(schemaValidation.totalUsage); + // finalResult = schemaValidation.extract; + // } + // } + + const totalTokensUsed = tokenUsage.reduce((a, b) => a + b.totalTokens, 0); + const llmUsage = estimateTotalCost(tokenUsage); + let tokensToBill = calculateFinalResultCost(finalResult); + + if (CUSTOM_U_TEAMS.includes(teamId)) { + tokensToBill = 1; + } + + // Bill team for usage + billTeam(teamId, subId, tokensToBill, logger, true).catch((error) => { + logger.error( + `Failed to bill team ${teamId} for ${tokensToBill} tokens: ${error}`, + ); + }); + + // Log job with token usage and sources + logJob({ + job_id: extractId, + success: true, + message: "Extract completed", + num_docs: 1, + docs: finalResult ?? {}, + time_taken: (new Date().getTime() - Date.now()) / 1000, + team_id: teamId, + mode: "extract", + url: request.urls?.join(", ") || "", + scrapeOptions: request, + origin: request.origin ?? "api", + num_tokens: totalTokensUsed, + tokens_billed: tokensToBill, + sources, + cost_tracking: costTracking, + }).then(() => { + updateExtract(extractId, { + status: "completed", + llmUsage, + sources, + costTracking, + }).catch((error) => { + logger.error( + `Failed to update extract ${extractId} status to completed: ${error}`, + ); + }); + }); + + logger.debug("Done!"); + + if ( + request.__experimental_cacheMode == "save" && + request.__experimental_cacheKey + ) { + logger.debug("Saving cached docs..."); + try { + await saveCachedDocs( + [...docsMap.values()], + request.__experimental_cacheKey, + ); + } catch (error) { + logger.error("Error saving cached docs", { error }); } } - // singleAnswerResult = completionResult; - // singleAnswerCompletions = singleAnswerResult; + // fs.writeFile( + // `logs/${request.urls?.[0].replaceAll("https://", "").replaceAll("http://", "").replaceAll("/", "-").replaceAll(".", "-")}-extract-${extractId}.json`, + // JSON.stringify(log, null, 2), + // ); - // Update token usage in traces - // if (completions && completions.numTokens) { - // const totalLength = docs.reduce( - // (sum, doc) => sum + (doc.markdown?.length || 0), - // 0, - // ); - // docs.forEach((doc) => { - // if (doc.metadata?.sourceURL) { - // const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL); - // if (trace && trace.contentStats) { - // trace.contentStats.tokensUsed = Math.floor( - // ((doc.markdown?.length || 0) / totalLength) * - // (completions?.numTokens || 0), - // ); - // } - // } - // }); - // } - } - - log["singleAnswerResult"] = singleAnswerResult; - log["multiEntityResult"] = multiEntityResult; - - let finalResult = reqSchema - ? await mixSchemaObjects( - reqSchema, - singleAnswerResult, - multiEntityResult, - logger.child({ method: "mixSchemaObjects" }), - ) - : singleAnswerResult || multiEntityResult; - - // Tokenize final result to get token count - // let finalResultTokens = 0; - // if (finalResult) { - // const finalResultStr = JSON.stringify(finalResult); - // finalResultTokens = numTokensFromString(finalResultStr, "gpt-4o"); - - // } - // // Deduplicate and validate final result against schema - // if (reqSchema && finalResult && finalResult.length <= extractConfig.DEDUPLICATION.MAX_TOKENS) { - // const schemaValidation = await generateCompletions( - // logger.child({ method: "extractService/validateAndDeduplicate" }), - // { - // mode: "llm", - // systemPrompt: `You are a data validator and deduplicator. Your task is to: - // 1. Remove any duplicate entries in the data extracted by merging that into a single object according to the provided shcema - // 2. Ensure all data matches the provided schema - // 3. Keep only the highest quality and most complete entries when duplicates are found. - - // Do not change anything else. If data is null keep it null. If the schema is not provided, return the data as is.`, - // prompt: `Please validate and merge the duplicate entries in this data according to the schema provided:\n - - // - - // ${JSON.stringify(finalResult)} - - // - - // - - // ${JSON.stringify(reqSchema)} - - // - // `, - // schema: reqSchema, - // }, - // undefined, - // undefined, - // true, - // "gpt-4o" - // ); - // console.log("schemaValidation", schemaValidation); - - // console.log("schemaValidation", finalResult); - - // if (schemaValidation?.extract) { - // tokenUsage.push(schemaValidation.totalUsage); - // finalResult = schemaValidation.extract; - // } - // } - - const totalTokensUsed = tokenUsage.reduce((a, b) => a + b.totalTokens, 0); - const llmUsage = estimateTotalCost(tokenUsage); - let tokensToBill = calculateFinalResultCost(finalResult); - - if (CUSTOM_U_TEAMS.includes(teamId)) { - tokensToBill = 1; - } - - // Bill team for usage - billTeam(teamId, subId, tokensToBill, logger, true).catch((error) => { - logger.error( - `Failed to bill team ${teamId} for ${tokensToBill} tokens: ${error}`, - ); - }); - - // Log job with token usage and sources - logJob({ - job_id: extractId, - success: true, - message: "Extract completed", - num_docs: 1, - docs: finalResult ?? {}, - time_taken: (new Date().getTime() - Date.now()) / 1000, - team_id: teamId, - mode: "extract", - url: request.urls?.join(", ") || "", - scrapeOptions: request, - origin: request.origin ?? "api", - num_tokens: totalTokensUsed, - tokens_billed: tokensToBill, - sources, - cost_tracking: costTracking, - }).then(() => { - updateExtract(extractId, { - status: "completed", + return { + success: true, + data: finalResult ?? {}, + extractId, + warning: undefined, + urlTrace: request.urlTrace ? urlTraces : undefined, llmUsage, + totalUrlsScraped, sources, - costTracking, - }).catch((error) => { - logger.error( - `Failed to update extract ${extractId} status to completed: ${error}`, - ); + }; + } catch (error) { + await logJob({ + job_id: extractId, + success: false, + message: (error instanceof Error ? error.message : typeof error === "string" ? error : "An unexpected error occurred"), + num_docs: 1, + docs: [], + time_taken: (new Date().getTime() - Date.now()) / 1000, + team_id: teamId, + mode: "extract", + url: request.urls?.join(", ") || "", + scrapeOptions: request, + origin: request.origin ?? "api", + num_tokens: 0, + tokens_billed: 0, + sources, + cost_tracking: costTracking, }); - }); - - logger.debug("Done!"); - - if ( - request.__experimental_cacheMode == "save" && - request.__experimental_cacheKey - ) { - logger.debug("Saving cached docs..."); - try { - await saveCachedDocs( - [...docsMap.values()], - request.__experimental_cacheKey, - ); - } catch (error) { - logger.error("Error saving cached docs", { error }); - } + throw error; } - - // fs.writeFile( - // `logs/${request.urls?.[0].replaceAll("https://", "").replaceAll("http://", "").replaceAll("/", "-").replaceAll(".", "-")}-extract-${extractId}.json`, - // JSON.stringify(log, null, 2), - // ); - - return { - success: true, - data: finalResult ?? {}, - extractId, - warning: undefined, - urlTrace: request.urlTrace ? urlTraces : undefined, - llmUsage, - totalUrlsScraped, - sources, - }; }