From 033e9bbf2971b51a2135142e311685b527abe0eb Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 14 Jan 2025 01:45:50 -0300 Subject: [PATCH] Nick: __experimental_streamSteps --- apps/api/src/controllers/v1/extract-status.ts | 1 + apps/api/src/controllers/v1/extract.ts | 1 + apps/api/src/controllers/v1/types.ts | 1 + apps/api/src/lib/extract/extract-redis.ts | 27 ++++++ .../api/src/lib/extract/extraction-service.ts | 86 ++++++++++++++++++- apps/api/src/lib/extract/url-processor.ts | 4 + apps/api/src/scraper/scrapeURL/lib/mock.ts | 2 +- apps/js-sdk/firecrawl/src/index.ts | 4 +- 8 files changed, 122 insertions(+), 4 deletions(-) diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index 1ee68f8d..6d81e400 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -36,5 +36,6 @@ export async function extractStatusController( status: extract.status, error: extract?.error ?? undefined, expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), + steps: extract.showSteps ? extract.steps : undefined, }); } diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 4e0ef877..83d74ad8 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -70,6 +70,7 @@ export async function extractController( plan: req.auth.plan, createdAt: Date.now(), status: "processing", + showSteps: req.body.__experimental_streamSteps, }); if (Sentry.isInitialized()) { diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 1160d871..14715c9b 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -221,6 +221,7 @@ export const extractV1Options = z allowExternalLinks: z.boolean().default(false), origin: z.string().optional().default("api"), urlTrace: z.boolean().default(false), + __experimental_streamSteps: z.boolean().default(false), timeout: z.number().int().positive().finite().safe().default(60000), }) .strict(strictMessage); diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index 658f734a..cb793572 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -1,6 +1,25 @@ import { redisConnection } from "../../services/queue-service"; import { logger as _logger } from "../logger"; +export enum ExtractStep { + INITIAL = "initial", + MULTI_ENTITY = "multi-entity", + MULTI_ENTITY_SCRAPE = "multi-entity-scrape", + MULTI_ENTITY_EXTRACT = "multi-entity-extract", + SCRAPE = "scrape", + MAP = "map", + EXTRACT = "extract", + COMPLETE = "complete", +} + +export type ExtractedStep = { + step: ExtractStep; + startedAt: number; + finishedAt: number; + error?: any; + discoveredLinks?: string[]; +}; + export type StoredExtract = { id: string; team_id: string; @@ -8,6 +27,8 @@ export type StoredExtract = { createdAt: number; status: "processing" | "completed" | "failed" | "cancelled"; error?: any; + showSteps?: boolean; + steps?: ExtractedStep[]; }; export async function saveExtract(id: string, extract: StoredExtract) { @@ -27,6 +48,12 @@ export async function updateExtract( ) { const current = await getExtract(id); if (!current) return; + + // Handle steps aggregation + if (extract.steps && current.steps) { + extract.steps = [...current.steps, ...extract.steps]; + } + await redisConnection.set( "extract:" + id, JSON.stringify({ ...current, ...extract }), diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 3dcee6df..d801395d 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -24,7 +24,7 @@ import Ajv from "ajv"; const ajv = new Ajv(); const openai = new OpenAI(); -import { updateExtract } from "./extract-redis"; +import { ExtractStep, updateExtract } from "./extract-redis"; import { deduplicateObjectsArray } from "./helpers/deduplicate-objs-array"; import { mergeNullValObjs } from "./helpers/merge-null-val-objs"; import { CUSTOM_U_TEAMS } from "./config"; @@ -157,6 +157,19 @@ export async function performExtraction( let multiEntityCompletions: completions[] = []; let multiEntityResult: any = {}; let singleAnswerResult: any = {}; + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.INITIAL, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: request.urls, + }, + ], + }); + // Process URLs const urlPromises = request.urls.map((url) => processUrl( @@ -188,6 +201,18 @@ export async function performExtraction( }; } + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MAP, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: links, + }, + ], + }); + let reqSchema = request.schema; reqSchema = await dereferenceSchema(reqSchema); @@ -209,8 +234,32 @@ export async function performExtraction( const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas(reqSchema, multiEntityKeys) rSchema = singleAnswerSchema; + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: [], + }, + ], + }); + const timeout = Math.floor((request.timeout || 40000) * 0.7) || 30000; + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_SCRAPE, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: [], + }, + ], + }); const scrapePromises = links.map((url) => { if (!docsMap.has(url)) { return scrapeDocument( @@ -298,6 +347,18 @@ export async function performExtraction( }; // console.log("schemaWithConfidence", schemaWithConfidence); + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_EXTRACT, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: [doc.metadata.url || doc.metadata.sourceURL || ""], + }, + ], + }); + const completionPromise = generateOpenAICompletions( logger.child({ method: "extractService/generateOpenAICompletions" }), { @@ -386,6 +447,17 @@ export async function performExtraction( // let rerank = await rerankLinks(links.map((url) => ({ url })), request.prompt ?? JSON.stringify(request.schema), urlTraces); + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.SCRAPE, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: links, + }, + ], + }); const scrapePromises = links.map((url) => { if (!docsMap.has(url)) { return scrapeDocument( @@ -431,6 +503,18 @@ export async function performExtraction( }; } + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.EXTRACT, + startedAt: Date.now(), + finishedAt: Date.now(), + discoveredLinks: [], + }, + ], + }); + // Generate completions singleAnswerCompletions = await generateOpenAICompletions( logger.child({ method: "extractService/generateOpenAICompletions" }), diff --git a/apps/api/src/lib/extract/url-processor.ts b/apps/api/src/lib/extract/url-processor.ts index 0928232d..ccdfb332 100644 --- a/apps/api/src/lib/extract/url-processor.ts +++ b/apps/api/src/lib/extract/url-processor.ts @@ -7,6 +7,8 @@ import { generateBasicCompletion } from "../LLM-extraction"; import { buildRefrasedPrompt } from "./build-prompts"; import { rerankLinksWithLLM } from "./reranker"; import { extractConfig } from "./config"; +import { updateExtract } from "./extract-redis"; +import { ExtractStep } from "./extract-redis"; interface ProcessUrlOptions { url: string; @@ -157,6 +159,8 @@ export async function processUrl( extractConfig.RERANKING.MAX_INITIAL_RANKING_LIMIT, ); + + // Perform reranking using either prompt or schema let searchQuery = ""; if (options.prompt) { diff --git a/apps/api/src/scraper/scrapeURL/lib/mock.ts b/apps/api/src/scraper/scrapeURL/lib/mock.ts index 666f4d9f..e96c8142 100644 --- a/apps/api/src/scraper/scrapeURL/lib/mock.ts +++ b/apps/api/src/scraper/scrapeURL/lib/mock.ts @@ -6,7 +6,7 @@ const saveMocksDirPath = path.join(__dirname, "../mocks/").replace("dist/", ""); const loadMocksDirPath = path.join(__dirname, "../../../__tests__/snips/mocks"); export async function saveMock(options: unknown, result: unknown) { - if (!process.env.FIRECRAWL_SAVE_MOCKS) return; + if (process.env.FIRECRAWL_SAVE_MOCKS !== "true") return; await fs.mkdir(saveMocksDirPath, { recursive: true }); diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index e7e8b65b..0ea3c2ee 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -930,12 +930,12 @@ export default class FirecrawlApp { * @returns The response from the extract operation. */ async asyncExtract( - url: string, + urls: string[], params?: ExtractParams, idempotencyKey?: string ): Promise { const headers = this.prepareHeaders(idempotencyKey); - let jsonData: any = { url, ...params }; + let jsonData: any = { urls, ...params }; let jsonSchema: any; try {