mirror of
https://github.com/mendableai/firecrawl.git
synced 2025-12-27 23:24:08 +00:00
Concurrency limit refactor + maxConcurrency parameter (FIR-2191) (#1643)
This commit is contained in:
parent
a8e3c29664
commit
f8983fffb7
1
.github/workflows/test-server.yml
vendored
1
.github/workflows/test-server.yml
vendored
@ -25,6 +25,7 @@ env:
|
||||
INDEX_SUPABASE_ANON_TOKEN: ${{ secrets.INDEX_SUPABASE_ANON_TOKEN }}
|
||||
INDEX_SUPABASE_URL: ${{ secrets.INDEX_SUPABASE_URL }}
|
||||
TEST_API_KEY: ${{ secrets.TEST_API_KEY }}
|
||||
TEST_API_KEY_CONCURRENCY: ${{ secrets.TEST_API_KEY_CONCURRENCY }}
|
||||
FIRE_ENGINE_BETA_URL: ${{ secrets.FIRE_ENGINE_BETA_URL }}
|
||||
USE_DB_AUTHENTICATION: true
|
||||
SERPER_API_KEY: ${{ secrets.SERPER_API_KEY }}
|
||||
|
||||
@ -1,209 +0,0 @@
|
||||
import { redisConnection } from "../services/queue-service";
|
||||
import {
|
||||
cleanOldConcurrencyLimitEntries,
|
||||
getConcurrencyLimitActiveJobs,
|
||||
pushConcurrencyLimitActiveJob,
|
||||
removeConcurrencyLimitActiveJob,
|
||||
takeConcurrencyLimitedJob,
|
||||
pushConcurrencyLimitedJob,
|
||||
getConcurrencyQueueJobsCount,
|
||||
ConcurrencyLimitedJob,
|
||||
} from "../lib/concurrency-limit";
|
||||
|
||||
// Mock Redis client
|
||||
jest.mock("../services/queue-service", () => ({
|
||||
redisConnection: {
|
||||
zremrangebyscore: jest.fn(),
|
||||
zrangebyscore: jest.fn(),
|
||||
zadd: jest.fn(),
|
||||
zrem: jest.fn(),
|
||||
zmpop: jest.fn(),
|
||||
zcard: jest.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
describe("Concurrency Limit", () => {
|
||||
const mockTeamId = "test-team-id";
|
||||
const mockJobId = "test-job-id";
|
||||
const mockNow = 1000000;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("cleanOldConcurrencyLimitEntries", () => {
|
||||
it("should remove entries older than current timestamp", async () => {
|
||||
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
|
||||
|
||||
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
|
||||
"concurrency-limiter:test-team-id",
|
||||
-Infinity,
|
||||
mockNow
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getConcurrencyLimitActiveJobs", () => {
|
||||
it("should return active jobs after given timestamp", async () => {
|
||||
const mockActiveJobs = ["job1", "job2"];
|
||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(mockActiveJobs);
|
||||
|
||||
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
||||
|
||||
expect(result).toEqual(mockActiveJobs);
|
||||
expect(redisConnection.zrangebyscore).toHaveBeenCalledWith(
|
||||
"concurrency-limiter:test-team-id",
|
||||
mockNow,
|
||||
Infinity
|
||||
);
|
||||
});
|
||||
|
||||
it("should return empty array when no active jobs", async () => {
|
||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
|
||||
|
||||
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
||||
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pushConcurrencyLimitActiveJob", () => {
|
||||
it("should add job with expiration timestamp", async () => {
|
||||
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, 2 * 60 * 1000, mockNow);
|
||||
|
||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
||||
"concurrency-limiter:test-team-id",
|
||||
mockNow + 2 * 60 * 1000, // stalledJobTimeoutMs
|
||||
mockJobId
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("removeConcurrencyLimitActiveJob", () => {
|
||||
it("should remove job from active jobs", async () => {
|
||||
await removeConcurrencyLimitActiveJob(mockTeamId, mockJobId);
|
||||
|
||||
expect(redisConnection.zrem).toHaveBeenCalledWith(
|
||||
"concurrency-limiter:test-team-id",
|
||||
mockJobId
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Queue Operations", () => {
|
||||
const mockJob: ConcurrencyLimitedJob = {
|
||||
id: mockJobId,
|
||||
data: { test: "data" },
|
||||
opts: {},
|
||||
priority: 1,
|
||||
};
|
||||
|
||||
describe("takeConcurrencyLimitedJob", () => {
|
||||
it("should return null when queue is empty", async () => {
|
||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue(null);
|
||||
|
||||
const result = await takeConcurrencyLimitedJob(mockTeamId);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("should return and remove the highest priority job", async () => {
|
||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
|
||||
"key",
|
||||
[[JSON.stringify(mockJob)]],
|
||||
]);
|
||||
|
||||
const result = await takeConcurrencyLimitedJob(mockTeamId);
|
||||
|
||||
expect(result).toEqual(mockJob);
|
||||
expect(redisConnection.zmpop).toHaveBeenCalledWith(
|
||||
1,
|
||||
"concurrency-limit-queue:test-team-id",
|
||||
"MIN"
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pushConcurrencyLimitedJob", () => {
|
||||
it("should add job to queue with priority", async () => {
|
||||
await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000);
|
||||
|
||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
||||
"concurrency-limit-queue:test-team-id",
|
||||
mockJob.priority,
|
||||
JSON.stringify(mockJob)
|
||||
);
|
||||
});
|
||||
|
||||
it("should use default priority 1 when not specified", async () => {
|
||||
const jobWithoutPriority = { ...mockJob };
|
||||
delete jobWithoutPriority.priority;
|
||||
|
||||
await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority, 30000);
|
||||
|
||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
||||
"concurrency-limit-queue:test-team-id",
|
||||
1,
|
||||
JSON.stringify(jobWithoutPriority)
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getConcurrencyQueueJobsCount", () => {
|
||||
it("should return the number of jobs in queue", async () => {
|
||||
const mockCount = 5;
|
||||
(redisConnection.zcard as jest.Mock).mockResolvedValue(mockCount);
|
||||
|
||||
const result = await getConcurrencyQueueJobsCount(mockTeamId);
|
||||
|
||||
expect(result).toBe(mockCount);
|
||||
expect(redisConnection.zcard).toHaveBeenCalledWith(
|
||||
"concurrency-limit-queue:test-team-id"
|
||||
);
|
||||
});
|
||||
|
||||
it("should return 0 for empty queue", async () => {
|
||||
(redisConnection.zcard as jest.Mock).mockResolvedValue(0);
|
||||
|
||||
const result = await getConcurrencyQueueJobsCount(mockTeamId);
|
||||
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Integration Scenarios", () => {
|
||||
it("should handle complete job lifecycle", async () => {
|
||||
const mockJob: ConcurrencyLimitedJob = {
|
||||
id: "lifecycle-test",
|
||||
data: { test: "lifecycle" },
|
||||
opts: {},
|
||||
};
|
||||
|
||||
// Push job to queue
|
||||
await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000);
|
||||
expect(redisConnection.zadd).toHaveBeenCalled();
|
||||
|
||||
// Take job from queue
|
||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
|
||||
"key",
|
||||
[[JSON.stringify(mockJob)]],
|
||||
]);
|
||||
const takenJob = await takeConcurrencyLimitedJob(mockTeamId);
|
||||
expect(takenJob).toEqual(mockJob);
|
||||
|
||||
// Add to active jobs
|
||||
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000, mockNow);
|
||||
expect(redisConnection.zadd).toHaveBeenCalled();
|
||||
|
||||
// Verify active jobs
|
||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([mockJob.id]);
|
||||
const activeJobs = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
||||
expect(activeJobs).toContain(mockJob.id);
|
||||
|
||||
// Remove from active jobs
|
||||
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
|
||||
expect(redisConnection.zrem).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
106
apps/api/src/__tests__/snips/concurrency.test.ts
Normal file
106
apps/api/src/__tests__/snips/concurrency.test.ts
Normal file
@ -0,0 +1,106 @@
|
||||
import { batchScrapeWithConcurrencyTracking, concurrencyCheck, crawlWithConcurrencyTracking, defaultIdentity, Identity } from "./lib";
|
||||
|
||||
const concurrencyIdentity: Identity = {
|
||||
apiKey: process.env.TEST_API_KEY_CONCURRENCY ?? process.env.TEST_API_KEY!,
|
||||
}
|
||||
|
||||
if (!process.env.TEST_SUITE_SELF_HOSTED) {
|
||||
let accountConcurrencyLimit = 2;
|
||||
|
||||
beforeAll(async () => {
|
||||
const { maxConcurrency } = await concurrencyCheck(concurrencyIdentity);
|
||||
accountConcurrencyLimit = maxConcurrency;
|
||||
|
||||
console.log("Account concurrency limit:", accountConcurrencyLimit);
|
||||
|
||||
if (accountConcurrencyLimit > 20) {
|
||||
console.warn("Your account's concurrency limit (" + accountConcurrencyLimit + ") is likely too high, which will cause these tests to fail. Please set up TEST_API_KEY_CONCURRENCY with an API key that has a lower concurrency limit.");
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
describe("Concurrency queue and limit", () => {
|
||||
it("crawl utilizes full concurrency limit and doesn't go over", async () => {
|
||||
const limit = accountConcurrencyLimit * 2;
|
||||
|
||||
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
||||
url: "https://firecrawl.dev",
|
||||
limit,
|
||||
}, concurrencyIdentity);
|
||||
|
||||
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
|
||||
expect(crawl.completed).toBe(limit);
|
||||
}, 600000);
|
||||
|
||||
it("crawl handles maxConcurrency properly", async () => {
|
||||
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
||||
url: "https://firecrawl.dev",
|
||||
limit: 15,
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity);
|
||||
|
||||
expect(Math.max(...concurrencies)).toBe(5);
|
||||
expect(crawl.completed).toBe(15);
|
||||
}, 600000);
|
||||
|
||||
it("crawl maxConcurrency stacks properly", async () => {
|
||||
const [{ crawl: crawl1, concurrencies: concurrencies1 }, { crawl: crawl2, concurrencies: concurrencies2 }] = await Promise.all([
|
||||
crawlWithConcurrencyTracking({
|
||||
url: "https://firecrawl.dev",
|
||||
limit: 15,
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity),
|
||||
crawlWithConcurrencyTracking({
|
||||
url: "https://firecrawl.dev",
|
||||
limit: 15,
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity),
|
||||
]);
|
||||
|
||||
expect(Math.max(...concurrencies1, ...concurrencies2)).toBe(10);
|
||||
expect(crawl1.completed).toBe(15);
|
||||
expect(crawl2.completed).toBe(15);
|
||||
}, 1200000);
|
||||
|
||||
it("batch scrape utilizes full concurrency limit and doesn't go over", async () => {
|
||||
const limit = accountConcurrencyLimit * 2;
|
||||
|
||||
const { batchScrape, concurrencies } = await batchScrapeWithConcurrencyTracking({
|
||||
urls: Array(limit).fill(0).map(_ => `https://firecrawl.dev`),
|
||||
}, concurrencyIdentity);
|
||||
|
||||
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
|
||||
expect(batchScrape.completed).toBe(limit);
|
||||
}, 600000);
|
||||
|
||||
it("batch scrape handles maxConcurrency properly", async () => {
|
||||
const { batchScrape, concurrencies } = await batchScrapeWithConcurrencyTracking({
|
||||
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity);
|
||||
|
||||
expect(Math.max(...concurrencies)).toBe(5);
|
||||
expect(batchScrape.completed).toBe(15);
|
||||
}, 600000);
|
||||
|
||||
it("batch scrape maxConcurrency stacks properly", async () => {
|
||||
const [{ batchScrape: batchScrape1, concurrencies: concurrencies1 }, { batchScrape: batchScrape2, concurrencies: concurrencies2 }] = await Promise.all([
|
||||
batchScrapeWithConcurrencyTracking({
|
||||
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity),
|
||||
batchScrapeWithConcurrencyTracking({
|
||||
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||
maxConcurrency: 5,
|
||||
}, concurrencyIdentity),
|
||||
]);
|
||||
|
||||
expect(Math.max(...concurrencies1, ...concurrencies2)).toBe(10);
|
||||
expect(batchScrape1.completed).toBe(15);
|
||||
expect(batchScrape2.completed).toBe(15);
|
||||
}, 1200000);
|
||||
});
|
||||
} else {
|
||||
it("stubbed", () => {
|
||||
expect(true).toBe(true);
|
||||
});
|
||||
}
|
||||
@ -10,14 +10,22 @@ import request from "supertest";
|
||||
|
||||
const TEST_URL = "http://127.0.0.1:3002";
|
||||
|
||||
export type Identity = {
|
||||
apiKey: string;
|
||||
}
|
||||
|
||||
export const defaultIdentity: Identity = {
|
||||
apiKey: process.env.TEST_API_KEY!,
|
||||
};
|
||||
|
||||
// =========================================
|
||||
// Scrape API
|
||||
// =========================================
|
||||
|
||||
async function scrapeRaw(body: ScrapeRequestInput) {
|
||||
async function scrapeRaw(body: ScrapeRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/scrape")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
@ -34,8 +42,8 @@ function expectScrapeToFail(response: Awaited<ReturnType<typeof scrapeRaw>>) {
|
||||
expect(typeof response.body.error).toBe("string");
|
||||
}
|
||||
|
||||
export async function scrape(body: ScrapeRequestInput): Promise<Document> {
|
||||
const raw = await scrapeRaw(body);
|
||||
export async function scrape(body: ScrapeRequestInput, identity = defaultIdentity): Promise<Document> {
|
||||
const raw = await scrapeRaw(body, identity);
|
||||
expectScrapeToSucceed(raw);
|
||||
if (body.proxy === "stealth") {
|
||||
expect(raw.body.data.metadata.proxyUsed).toBe("stealth");
|
||||
@ -45,24 +53,24 @@ export async function scrape(body: ScrapeRequestInput): Promise<Document> {
|
||||
return raw.body.data;
|
||||
}
|
||||
|
||||
export async function scrapeWithFailure(body: ScrapeRequestInput): Promise<{
|
||||
export async function scrapeWithFailure(body: ScrapeRequestInput, identity = defaultIdentity): Promise<{
|
||||
success: false;
|
||||
error: string;
|
||||
}> {
|
||||
const raw = await scrapeRaw(body);
|
||||
const raw = await scrapeRaw(body, identity);
|
||||
expectScrapeToFail(raw);
|
||||
return raw.body;
|
||||
}
|
||||
|
||||
export async function scrapeStatusRaw(jobId: string) {
|
||||
export async function scrapeStatusRaw(jobId: string, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/scrape/" + encodeURIComponent(jobId))
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
export async function scrapeStatus(jobId: string): Promise<Document> {
|
||||
const raw = await scrapeStatusRaw(jobId);
|
||||
export async function scrapeStatus(jobId: string, identity = defaultIdentity): Promise<Document> {
|
||||
const raw = await scrapeStatusRaw(jobId, identity);
|
||||
expect(raw.statusCode).toBe(200);
|
||||
expect(raw.body.success).toBe(true);
|
||||
expect(typeof raw.body.data).toBe("object");
|
||||
@ -75,30 +83,30 @@ export async function scrapeStatus(jobId: string): Promise<Document> {
|
||||
// Crawl API
|
||||
// =========================================
|
||||
|
||||
async function crawlStart(body: CrawlRequestInput) {
|
||||
async function crawlStart(body: CrawlRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/crawl")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
|
||||
async function crawlStatus(id: string) {
|
||||
async function crawlStatus(id: string, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/crawl/" + encodeURIComponent(id))
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
async function crawlOngoingRaw() {
|
||||
async function crawlOngoingRaw(identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/crawl/ongoing")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
export async function crawlOngoing(): Promise<Exclude<OngoingCrawlsResponse, ErrorResponse>> {
|
||||
const res = await crawlOngoingRaw();
|
||||
export async function crawlOngoing(identity = defaultIdentity): Promise<Exclude<OngoingCrawlsResponse, ErrorResponse>> {
|
||||
const res = await crawlOngoingRaw(identity);
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(res.body.success).toBe(true);
|
||||
return res.body;
|
||||
@ -120,17 +128,17 @@ function expectCrawlToSucceed(response: Awaited<ReturnType<typeof crawlStatus>>)
|
||||
expect(response.body.data.length).toBeGreaterThan(0);
|
||||
}
|
||||
|
||||
export async function asyncCrawl(body: CrawlRequestInput): Promise<Exclude<CrawlResponse, ErrorResponse>> {
|
||||
const cs = await crawlStart(body);
|
||||
export async function asyncCrawl(body: CrawlRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlResponse, ErrorResponse>> {
|
||||
const cs = await crawlStart(body, identity);
|
||||
expectCrawlStartToSucceed(cs);
|
||||
return cs.body;
|
||||
}
|
||||
|
||||
export async function asyncCrawlWaitForFinish(id: string): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
export async function asyncCrawlWaitForFinish(id: string, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
let x;
|
||||
|
||||
do {
|
||||
x = await crawlStatus(id);
|
||||
x = await crawlStatus(id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
} while (x.body.status === "scraping");
|
||||
@ -139,14 +147,14 @@ export async function asyncCrawlWaitForFinish(id: string): Promise<Exclude<Crawl
|
||||
return x.body;
|
||||
}
|
||||
|
||||
export async function crawl(body: CrawlRequestInput): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
const cs = await crawlStart(body);
|
||||
export async function crawl(body: CrawlRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
const cs = await crawlStart(body, identity);
|
||||
expectCrawlStartToSucceed(cs);
|
||||
|
||||
let x;
|
||||
|
||||
do {
|
||||
x = await crawlStatus(cs.body.id);
|
||||
x = await crawlStatus(cs.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
} while (x.body.status === "scraping");
|
||||
@ -159,18 +167,18 @@ export async function crawl(body: CrawlRequestInput): Promise<Exclude<CrawlStatu
|
||||
// Batch Scrape API
|
||||
// =========================================
|
||||
|
||||
async function batchScrapeStart(body: BatchScrapeRequestInput) {
|
||||
async function batchScrapeStart(body: BatchScrapeRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/batch/scrape")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
|
||||
async function batchScrapeStatus(id: string) {
|
||||
async function batchScrapeStatus(id: string, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/batch/scrape/" + encodeURIComponent(id))
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
@ -190,14 +198,14 @@ function expectBatchScrapeToSucceed(response: Awaited<ReturnType<typeof batchScr
|
||||
expect(response.body.data.length).toBeGreaterThan(0);
|
||||
}
|
||||
|
||||
export async function batchScrape(body: BatchScrapeRequestInput): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
const bss = await batchScrapeStart(body);
|
||||
export async function batchScrape(body: BatchScrapeRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||
const bss = await batchScrapeStart(body, identity);
|
||||
expectBatchScrapeStartToSucceed(bss);
|
||||
|
||||
let x;
|
||||
|
||||
do {
|
||||
x = await batchScrapeStatus(bss.body.id);
|
||||
x = await batchScrapeStatus(bss.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
} while (x.body.status === "scraping");
|
||||
@ -210,10 +218,10 @@ export async function batchScrape(body: BatchScrapeRequestInput): Promise<Exclud
|
||||
// Map API
|
||||
// =========================================
|
||||
|
||||
export async function map(body: MapRequestInput) {
|
||||
export async function map(body: MapRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/map")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
@ -229,18 +237,18 @@ export function expectMapToSucceed(response: Awaited<ReturnType<typeof map>>) {
|
||||
// Extract API
|
||||
// =========================================
|
||||
|
||||
async function extractStart(body: ExtractRequestInput) {
|
||||
async function extractStart(body: ExtractRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/extract")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
|
||||
async function extractStatus(id: string) {
|
||||
async function extractStatus(id: string, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/extract/" + encodeURIComponent(id))
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
@ -259,14 +267,14 @@ function expectExtractToSucceed(response: Awaited<ReturnType<typeof extractStatu
|
||||
expect(response.body).toHaveProperty("data");
|
||||
}
|
||||
|
||||
export async function extract(body: ExtractRequestInput): Promise<ExtractResponse> {
|
||||
const es = await extractStart(body);
|
||||
export async function extract(body: ExtractRequestInput, identity = defaultIdentity): Promise<ExtractResponse> {
|
||||
const es = await extractStart(body, identity);
|
||||
expectExtractStartToSucceed(es);
|
||||
|
||||
let x;
|
||||
|
||||
do {
|
||||
x = await extractStatus(es.body.id);
|
||||
x = await extractStatus(es.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
} while (x.body.status === "processing");
|
||||
@ -279,10 +287,10 @@ export async function extract(body: ExtractRequestInput): Promise<ExtractRespons
|
||||
// Search API
|
||||
// =========================================
|
||||
|
||||
async function searchRaw(body: SearchRequestInput) {
|
||||
async function searchRaw(body: SearchRequestInput, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/search")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
@ -295,8 +303,8 @@ function expectSearchToSucceed(response: Awaited<ReturnType<typeof searchRaw>>)
|
||||
expect(response.body.data.length).toBeGreaterThan(0);
|
||||
}
|
||||
|
||||
export async function search(body: SearchRequestInput): Promise<Document[]> {
|
||||
const raw = await searchRaw(body);
|
||||
export async function search(body: SearchRequestInput, identity = defaultIdentity): Promise<Document[]> {
|
||||
const raw = await searchRaw(body, identity);
|
||||
expectSearchToSucceed(raw);
|
||||
return raw.body.data;
|
||||
}
|
||||
@ -305,10 +313,10 @@ export async function search(body: SearchRequestInput): Promise<Document[]> {
|
||||
// Billing API
|
||||
// =========================================
|
||||
|
||||
export async function creditUsage(): Promise<{ remaining_credits: number }> {
|
||||
export async function creditUsage(identity = defaultIdentity): Promise<{ remaining_credits: number }> {
|
||||
const req = (await request(TEST_URL)
|
||||
.get("/v1/team/credit-usage")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json"));
|
||||
|
||||
if (req.status !== 200) {
|
||||
@ -318,13 +326,74 @@ export async function creditUsage(): Promise<{ remaining_credits: number }> {
|
||||
return req.body.data;
|
||||
}
|
||||
|
||||
export async function tokenUsage(): Promise<{ remaining_tokens: number }> {
|
||||
export async function tokenUsage(identity = defaultIdentity): Promise<{ remaining_tokens: number }> {
|
||||
return (await request(TEST_URL)
|
||||
.get("/v1/team/token-usage")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")).body.data;
|
||||
}
|
||||
|
||||
// =========================================
|
||||
// Concurrency API
|
||||
// =========================================
|
||||
|
||||
export async function concurrencyCheck(identity = defaultIdentity): Promise<{ concurrency: number, maxConcurrency: number }> {
|
||||
const x = (await request(TEST_URL)
|
||||
.get("/v1/concurrency-check")
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json"));
|
||||
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(x.body.success).toBe(true);
|
||||
return x.body;
|
||||
}
|
||||
|
||||
export async function crawlWithConcurrencyTracking(body: CrawlRequestInput, identity = defaultIdentity): Promise<{
|
||||
crawl: Exclude<CrawlStatusResponse, ErrorResponse>;
|
||||
concurrencies: number[];
|
||||
}> {
|
||||
const cs = await crawlStart(body, identity);
|
||||
expectCrawlStartToSucceed(cs);
|
||||
|
||||
let x, concurrencies: number[] = [];
|
||||
|
||||
do {
|
||||
x = await crawlStatus(cs.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
concurrencies.push((await concurrencyCheck(identity)).concurrency);
|
||||
} while (x.body.status === "scraping");
|
||||
|
||||
expectCrawlToSucceed(x);
|
||||
return {
|
||||
crawl: x.body,
|
||||
concurrencies,
|
||||
};
|
||||
}
|
||||
|
||||
export async function batchScrapeWithConcurrencyTracking(body: BatchScrapeRequestInput, identity = defaultIdentity): Promise<{
|
||||
batchScrape: Exclude<CrawlStatusResponse, ErrorResponse>;
|
||||
concurrencies: number[];
|
||||
}> {
|
||||
const cs = await batchScrapeStart(body, identity);
|
||||
expectBatchScrapeStartToSucceed(cs);
|
||||
|
||||
let x, concurrencies: number[] = [];
|
||||
|
||||
do {
|
||||
x = await batchScrapeStatus(cs.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
concurrencies.push((await concurrencyCheck(identity)).concurrency);
|
||||
} while (x.body.status === "scraping");
|
||||
|
||||
expectBatchScrapeToSucceed(x);
|
||||
return {
|
||||
batchScrape: x.body,
|
||||
concurrencies,
|
||||
};
|
||||
}
|
||||
|
||||
// =========================================
|
||||
// =========================================
|
||||
|
||||
@ -338,18 +407,18 @@ async function deepResearchStart(body: {
|
||||
formats?: string[];
|
||||
topic?: string;
|
||||
jsonOptions?: any;
|
||||
}) {
|
||||
}, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.post("/v1/deep-research")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send(body);
|
||||
}
|
||||
|
||||
async function deepResearchStatus(id: string) {
|
||||
async function deepResearchStatus(id: string, identity = defaultIdentity) {
|
||||
return await request(TEST_URL)
|
||||
.get("/v1/deep-research/" + encodeURIComponent(id))
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||
.send();
|
||||
}
|
||||
|
||||
@ -369,14 +438,14 @@ export async function deepResearch(body: {
|
||||
formats?: string[];
|
||||
topic?: string;
|
||||
jsonOptions?: any;
|
||||
}) {
|
||||
const ds = await deepResearchStart(body);
|
||||
}, identity = defaultIdentity) {
|
||||
const ds = await deepResearchStart(body, identity);
|
||||
expectDeepResearchStartToSucceed(ds);
|
||||
|
||||
let x;
|
||||
|
||||
do {
|
||||
x = await deepResearchStatus(ds.body.id);
|
||||
x = await deepResearchStatus(ds.body.id, identity);
|
||||
expect(x.statusCode).toBe(200);
|
||||
expect(typeof x.body.status).toBe("string");
|
||||
} while (x.body.status === "processing");
|
||||
|
||||
@ -469,42 +469,6 @@ describe("Scrape tests", () => {
|
||||
|
||||
expect(response.metadata.cacheState).toBe("hit");
|
||||
}, 147000);
|
||||
|
||||
it.concurrent("works properly on pages returning errors", async () => {
|
||||
const id = crypto.randomUUID();
|
||||
const url = "https://httpstat.us/404?testId=" + id;
|
||||
|
||||
await scrape({
|
||||
url,
|
||||
timeout: 60000,
|
||||
});
|
||||
|
||||
const response1 = await scrape({
|
||||
url,
|
||||
timeout: 60000,
|
||||
maxAge: 120000,
|
||||
});
|
||||
|
||||
expect(response1.metadata.cacheState).toBe("miss");
|
||||
|
||||
const response2 = await scrape({
|
||||
url,
|
||||
timeout: 60000,
|
||||
maxAge: 180000,
|
||||
});
|
||||
|
||||
expect(response2.metadata.cacheState).toBe("miss");
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 17000));
|
||||
|
||||
const response3 = await scrape({
|
||||
url,
|
||||
timeout: 60000,
|
||||
maxAge: 240000,
|
||||
});
|
||||
|
||||
expect(response3.metadata.cacheState).toBe("hit");
|
||||
}, 284000);
|
||||
});
|
||||
|
||||
describe("Change Tracking format", () => {
|
||||
|
||||
@ -102,6 +102,7 @@ export async function batchScrapeController(
|
||||
}, // NOTE: smart wait disabled for batch scrapes to ensure contentful scrape, speed does not matter
|
||||
team_id: req.auth.team_id,
|
||||
createdAt: Date.now(),
|
||||
maxConcurrency: req.body.maxConcurrency,
|
||||
};
|
||||
|
||||
if (!req.body.appendToId) {
|
||||
|
||||
@ -22,7 +22,7 @@ import { configDotenv } from "dotenv";
|
||||
import type { Job, JobState, Queue } from "bullmq";
|
||||
import { logger } from "../../lib/logger";
|
||||
import { supabase_rr_service, supabase_service } from "../../services/supabase";
|
||||
import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
|
||||
import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitActiveJobs } from "../../lib/concurrency-limit";
|
||||
import { getJobFromGCS } from "../../lib/gcs-jobs";
|
||||
configDotenv();
|
||||
|
||||
@ -162,9 +162,8 @@ export async function crawlStatusController(
|
||||
),
|
||||
);
|
||||
|
||||
const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
|
||||
const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set();
|
||||
const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]);
|
||||
const throttledJobsSet = new Set(await getConcurrencyLimitedJobs(req.auth.team_id));
|
||||
const activeJobsSet = new Set(await getCrawlConcurrencyLimitActiveJobs(req.params.jobId));
|
||||
|
||||
const validJobStatuses: [string, JobState | "unknown"][] = [];
|
||||
const validJobIDs: string[] = [];
|
||||
@ -173,6 +172,9 @@ export async function crawlStatusController(
|
||||
if (throttledJobsSet.has(id)) {
|
||||
validJobStatuses.push([id, "prioritized"]);
|
||||
validJobIDs.push(id);
|
||||
} else if (status === "unknown" && activeJobsSet.has(id)) {
|
||||
validJobStatuses.push([id, "active"]);
|
||||
validJobIDs.push(id);
|
||||
} else if (
|
||||
status !== "failed" &&
|
||||
status !== "unknown"
|
||||
|
||||
@ -87,6 +87,7 @@ export async function crawlController(
|
||||
}, // NOTE: smart wait disabled for crawls to ensure contentful scrape, speed does not matter
|
||||
team_id: req.auth.team_id,
|
||||
createdAt: Date.now(),
|
||||
maxConcurrency: req.body.maxConcurrency !== undefined ? Math.min(req.body.maxConcurrency, req.acuc.concurrency) : undefined,
|
||||
};
|
||||
|
||||
const crawler = crawlToCrawler(id, sc, req.acuc?.flags ?? null);
|
||||
|
||||
@ -584,6 +584,7 @@ export const batchScrapeRequestSchema = baseScrapeOptions
|
||||
webhook: webhookSchema.optional(),
|
||||
appendToId: z.string().uuid().optional(),
|
||||
ignoreInvalidURLs: z.boolean().default(false),
|
||||
maxConcurrency: z.number().positive().int().optional(),
|
||||
})
|
||||
.strict(strictMessage)
|
||||
.refine(extractRefine, extractRefineOpts)
|
||||
@ -646,6 +647,7 @@ export const crawlRequestSchema = crawlerOptions
|
||||
scrapeOptions: baseScrapeOptions.default({}),
|
||||
webhook: webhookSchema.optional(),
|
||||
limit: z.number().default(10000),
|
||||
maxConcurrency: z.number().positive().int().optional(),
|
||||
})
|
||||
.strict(strictMessage)
|
||||
.refine((x) => extractRefine(x.scrapeOptions), extractRefineOpts)
|
||||
|
||||
@ -1,12 +1,16 @@
|
||||
import { RateLimiterMode } from "../types";
|
||||
import { redisEvictConnection } from "../services/redis";
|
||||
import type { JobsOptions } from "bullmq";
|
||||
import type { Job, JobsOptions } from "bullmq";
|
||||
import { getACUCTeam } from "../controllers/auth";
|
||||
import { getCrawl, StoredCrawl } from "./crawl-redis";
|
||||
import { getScrapeQueue } from "../services/queue-service";
|
||||
import { logger } from "./logger";
|
||||
|
||||
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
|
||||
const constructQueueKey = (team_id: string) =>
|
||||
"concurrency-limit-queue:" + team_id;
|
||||
|
||||
const constructCrawlKey = (crawl_id: string) => "crawl-concurrency-limiter:" + crawl_id;
|
||||
const constructCrawlQueueKey = (crawl_id: string) => "crawl-concurrency-limit-queue:" + crawl_id;
|
||||
|
||||
export async function cleanOldConcurrencyLimitEntries(
|
||||
team_id: string,
|
||||
@ -65,14 +69,33 @@ export async function takeConcurrencyLimitedJob(
|
||||
return JSON.parse(res[1][0][0]);
|
||||
}
|
||||
|
||||
async function takeConcurrencyLimitedJobAndTimeout(
|
||||
team_id: string,
|
||||
): Promise<{
|
||||
job: ConcurrencyLimitedJob;
|
||||
timeout: number;
|
||||
} | null> {
|
||||
await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now());
|
||||
const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN");
|
||||
if (res === null || res === undefined) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
job: JSON.parse(res[1][0][0]),
|
||||
timeout: res[1][0][1] === "inf" ? Infinity : parseFloat(res[1][0][1]),
|
||||
};
|
||||
}
|
||||
|
||||
export async function pushConcurrencyLimitedJob(
|
||||
team_id: string,
|
||||
job: ConcurrencyLimitedJob,
|
||||
timeout: number,
|
||||
now: number = Date.now(),
|
||||
) {
|
||||
await redisEvictConnection.zadd(
|
||||
constructQueueKey(team_id),
|
||||
Date.now() + timeout,
|
||||
now + timeout,
|
||||
JSON.stringify(job),
|
||||
);
|
||||
}
|
||||
@ -126,34 +149,124 @@ export async function removeCrawlConcurrencyLimitActiveJob(
|
||||
await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id);
|
||||
}
|
||||
|
||||
export async function takeCrawlConcurrencyLimitedJob(
|
||||
crawl_id: string,
|
||||
): Promise<ConcurrencyLimitedJob | null> {
|
||||
const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN");
|
||||
if (res === null || res === undefined) {
|
||||
return null;
|
||||
/**
|
||||
* Grabs the next job from the team's concurrency limit queue. Handles crawl concurrency limits.
|
||||
*
|
||||
* This function may only be called once the outer code has verified that the team has not reached its concurrency limit.
|
||||
*
|
||||
* @param teamId
|
||||
* @returns A job that can be run, or null if there are no more jobs to run.
|
||||
*/
|
||||
async function getNextConcurrentJob(teamId: string): Promise<{
|
||||
job: ConcurrencyLimitedJob;
|
||||
timeout: number;
|
||||
} | null> {
|
||||
let ignoredJobs: {
|
||||
job: ConcurrencyLimitedJob;
|
||||
timeout: number;
|
||||
}[] = [];
|
||||
|
||||
let finalJob: {
|
||||
job: ConcurrencyLimitedJob;
|
||||
timeout: number;
|
||||
} | null = null;
|
||||
|
||||
const crawlCache = new Map<string, StoredCrawl>();
|
||||
|
||||
while (finalJob === null) {
|
||||
const res = await takeConcurrencyLimitedJobAndTimeout(teamId);
|
||||
if (res === null) {
|
||||
break;
|
||||
}
|
||||
|
||||
// If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit
|
||||
if (res.job.data.crawl_id) {
|
||||
const sc = crawlCache.get(res.job.data.crawl_id) ?? await getCrawl(res.job.data.crawl_id);
|
||||
if (sc !== null) {
|
||||
crawlCache.set(res.job.data.crawl_id, sc);
|
||||
}
|
||||
|
||||
const maxCrawlConcurrency = sc === null
|
||||
? null
|
||||
: (typeof sc.crawlerOptions?.delay === "number")
|
||||
? 1
|
||||
: sc.maxConcurrency ?? null;
|
||||
|
||||
if (maxCrawlConcurrency !== null) {
|
||||
// If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit
|
||||
const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length;
|
||||
if (currentActiveConcurrency < maxCrawlConcurrency) {
|
||||
// If we're under the max concurrency limit, we can run the job
|
||||
finalJob = res;
|
||||
} else {
|
||||
// If we're at the max concurrency limit, we need to ignore the job
|
||||
ignoredJobs.push({
|
||||
job: res.job,
|
||||
timeout: res.timeout,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// If the crawl has no max concurrency limit, we can run the job
|
||||
finalJob = res;
|
||||
}
|
||||
} else {
|
||||
// If the job is not associated with a crawl ID, we can run the job
|
||||
finalJob = res;
|
||||
}
|
||||
}
|
||||
return JSON.parse(res[1][0][0]);
|
||||
|
||||
for (const ignoredJob of ignoredJobs) {
|
||||
await pushConcurrencyLimitedJob(teamId, ignoredJob.job, ignoredJob.timeout);
|
||||
}
|
||||
|
||||
return finalJob;
|
||||
}
|
||||
|
||||
export async function pushCrawlConcurrencyLimitedJob(
|
||||
crawl_id: string,
|
||||
job: ConcurrencyLimitedJob,
|
||||
) {
|
||||
await redisEvictConnection.zadd(
|
||||
constructCrawlQueueKey(crawl_id),
|
||||
job.priority ?? 1,
|
||||
JSON.stringify(job),
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Called when a job associated with a concurrency queue is done.
|
||||
*
|
||||
* @param job The BullMQ job that is done.
|
||||
*/
|
||||
export async function concurrentJobDone(job: Job) {
|
||||
if (job.id && job.data && job.data.team_id) {
|
||||
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
||||
await cleanOldConcurrencyLimitEntries(job.data.team_id);
|
||||
|
||||
export async function getCrawlConcurrencyLimitedJobs(
|
||||
crawl_id: string,
|
||||
) {
|
||||
return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id));
|
||||
}
|
||||
if (job.data.crawl_id) {
|
||||
await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id);
|
||||
await cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
|
||||
}
|
||||
|
||||
export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise<number> {
|
||||
const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id));
|
||||
return count;
|
||||
const maxTeamConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
|
||||
|
||||
if (currentActiveConcurrency < maxTeamConcurrency) {
|
||||
const nextJob = await getNextConcurrentJob(job.data.team_id);
|
||||
if (nextJob !== null) {
|
||||
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.job.id, 60 * 1000);
|
||||
|
||||
if (nextJob.job.data.crawl_id) {
|
||||
await pushCrawlConcurrencyLimitActiveJob(nextJob.job.data.crawl_id, nextJob.job.id, 60 * 1000);
|
||||
|
||||
const sc = await getCrawl(nextJob.job.data.crawl_id);
|
||||
if (sc !== null && typeof sc.crawlerOptions?.delay === "number") {
|
||||
await new Promise(resolve => setTimeout(resolve, sc.crawlerOptions.delay * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
(await getScrapeQueue()).add(
|
||||
nextJob.job.id,
|
||||
{
|
||||
...nextJob.job.data,
|
||||
concurrencyLimitHit: true,
|
||||
},
|
||||
{
|
||||
...nextJob.job.opts,
|
||||
jobId: nextJob.job.id,
|
||||
priority: nextJob.job.priority,
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ export type StoredCrawl = {
|
||||
robots?: string;
|
||||
cancelled?: boolean;
|
||||
createdAt: number;
|
||||
maxConcurrency?: number;
|
||||
};
|
||||
|
||||
export async function saveCrawl(id: string, crawl: StoredCrawl) {
|
||||
|
||||
@ -4,14 +4,12 @@ import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types";
|
||||
import * as Sentry from "@sentry/node";
|
||||
import {
|
||||
cleanOldConcurrencyLimitEntries,
|
||||
cleanOldCrawlConcurrencyLimitEntries,
|
||||
getConcurrencyLimitActiveJobs,
|
||||
getConcurrencyQueueJobsCount,
|
||||
getCrawlConcurrencyQueueJobsCount,
|
||||
getCrawlConcurrencyLimitActiveJobs,
|
||||
pushConcurrencyLimitActiveJob,
|
||||
pushConcurrencyLimitedJob,
|
||||
pushCrawlConcurrencyLimitActiveJob,
|
||||
pushCrawlConcurrencyLimitedJob,
|
||||
} from "../lib/concurrency-limit";
|
||||
import { logger } from "../lib/logger";
|
||||
import { sendNotificationWithCustomDays } from './notification/email_notification';
|
||||
@ -19,7 +17,7 @@ import { shouldSendConcurrencyLimitNotification } from './notification/notificat
|
||||
import { getACUC, getACUCTeam } from "../controllers/auth";
|
||||
import { getJobFromGCS } from "../lib/gcs-jobs";
|
||||
import { Document } from "../controllers/v1/types";
|
||||
import type { Logger } from "winston";
|
||||
import { getCrawl } from "../lib/crawl-redis";
|
||||
|
||||
/**
|
||||
* Checks if a job is a crawl or batch scrape based on its options
|
||||
@ -50,25 +48,6 @@ async function _addScrapeJobToConcurrencyQueue(
|
||||
}, webScraperOptions.crawl_id ? Infinity :(webScraperOptions.scrapeOptions?.timeout ?? (60 * 1000)));
|
||||
}
|
||||
|
||||
async function _addCrawlScrapeJobToConcurrencyQueue(
|
||||
webScraperOptions: any,
|
||||
options: any,
|
||||
jobId: string,
|
||||
jobPriority: number,
|
||||
) {
|
||||
await pushCrawlConcurrencyLimitedJob(webScraperOptions.crawl_id, {
|
||||
id: jobId,
|
||||
data: webScraperOptions,
|
||||
opts: {
|
||||
...options,
|
||||
priority: jobPriority,
|
||||
jobId: jobId,
|
||||
},
|
||||
priority: jobPriority,
|
||||
});
|
||||
// NEVER ADD THESE TO BULLMQ!!! THEY ARE ADDED IN QUEUE-WORKER!!! SHOOOOO!!! - mogery
|
||||
}
|
||||
|
||||
export async function _addScrapeJobToBullMQ(
|
||||
webScraperOptions: any,
|
||||
options: any,
|
||||
@ -79,10 +58,13 @@ export async function _addScrapeJobToBullMQ(
|
||||
webScraperOptions &&
|
||||
webScraperOptions.team_id
|
||||
) {
|
||||
if (webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay) {
|
||||
await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000);
|
||||
} else {
|
||||
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
|
||||
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
|
||||
|
||||
if (webScraperOptions.crawl_id) {
|
||||
const sc = await getCrawl(webScraperOptions.crawl_id);
|
||||
if (webScraperOptions.crawlerOptions?.delay || sc?.maxConcurrency) {
|
||||
await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,49 +82,56 @@ async function addScrapeJobRaw(
|
||||
jobPriority: number,
|
||||
directToBullMQ: boolean = false,
|
||||
) {
|
||||
const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay;
|
||||
|
||||
if (hasCrawlDelay) {
|
||||
await _addCrawlScrapeJobToConcurrencyQueue(
|
||||
webScraperOptions,
|
||||
options,
|
||||
jobId,
|
||||
jobPriority
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let concurrencyLimited = false;
|
||||
let concurrencyLimited: "yes" | "yes-crawl" | "no" | null = null;
|
||||
let currentActiveConcurrency = 0;
|
||||
let maxConcurrency = 0;
|
||||
|
||||
if (
|
||||
webScraperOptions &&
|
||||
webScraperOptions.team_id
|
||||
) {
|
||||
const now = Date.now();
|
||||
maxConcurrency = (await getACUCTeam(webScraperOptions.team_id, false, true, webScraperOptions.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
|
||||
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
|
||||
concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
|
||||
if (directToBullMQ) {
|
||||
concurrencyLimited = "no";
|
||||
} else {
|
||||
if (webScraperOptions.crawl_id) {
|
||||
const crawl = await getCrawl(webScraperOptions.crawl_id);
|
||||
const concurrencyLimit = !crawl
|
||||
? null
|
||||
: crawl.crawlerOptions?.delay === undefined && crawl.maxConcurrency === undefined
|
||||
? null
|
||||
: crawl.maxConcurrency ?? 1;
|
||||
|
||||
if (concurrencyLimit !== null) {
|
||||
const crawlConcurrency = (await getCrawlConcurrencyLimitActiveJobs(webScraperOptions.crawl_id)).length;
|
||||
const freeSlots = Math.max(concurrencyLimit - crawlConcurrency, 0);
|
||||
if (freeSlots === 0) {
|
||||
concurrencyLimited = "yes-crawl";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (concurrencyLimited === null) {
|
||||
const now = Date.now();
|
||||
const maxConcurrency = (await getACUCTeam(webScraperOptions.team_id, false, true, webScraperOptions.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
await cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
|
||||
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
|
||||
concurrencyLimited = currentActiveConcurrency >= maxConcurrency ? "yes" : "no";
|
||||
}
|
||||
}
|
||||
|
||||
const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id);
|
||||
if (concurrencyLimited === "yes" || concurrencyLimited === "yes-crawl") {
|
||||
if (concurrencyLimited === "yes") {
|
||||
// Detect if they hit their concurrent limit
|
||||
// If above by 2x, send them an email
|
||||
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
|
||||
const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id);
|
||||
if(concurrencyQueueJobs > maxConcurrency) {
|
||||
// logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
|
||||
|
||||
if (concurrencyLimited && !directToBullMQ) {
|
||||
// Detect if they hit their concurrent limit
|
||||
// If above by 2x, send them an email
|
||||
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
|
||||
if(concurrencyQueueJobs > maxConcurrency) {
|
||||
// logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
|
||||
|
||||
// Only send notification if it's not a crawl or batch scrape
|
||||
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
|
||||
if (shouldSendNotification) {
|
||||
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||
logger.error("Error sending notification (concurrency limit reached)", { error });
|
||||
});
|
||||
}
|
||||
// Only send notification if it's not a crawl or batch scrape
|
||||
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
|
||||
if (shouldSendNotification) {
|
||||
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||
logger.error("Error sending notification (concurrency limit reached)", { error });
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
webScraperOptions.concurrencyLimited = true;
|
||||
@ -210,140 +199,189 @@ export async function addScrapeJobs(
|
||||
) {
|
||||
if (jobs.length === 0) return true;
|
||||
|
||||
const addToCCQ = jobs.filter(job => job.data.crawlerOptions?.delay);
|
||||
const dontAddToCCQ = jobs.filter(job => !job.data.crawlerOptions?.delay);
|
||||
const jobsByTeam = new Map<string, {
|
||||
data: WebScraperOptions;
|
||||
opts: {
|
||||
jobId: string;
|
||||
priority: number;
|
||||
};
|
||||
}[]>();
|
||||
|
||||
let countCanBeDirectlyAdded = Infinity;
|
||||
let currentActiveConcurrency = 0;
|
||||
let maxConcurrency = 0;
|
||||
for (const job of jobs) {
|
||||
if (!jobsByTeam.has(job.data.team_id)) {
|
||||
jobsByTeam.set(job.data.team_id, []);
|
||||
}
|
||||
jobsByTeam.get(job.data.team_id)!.push(job);
|
||||
}
|
||||
|
||||
for (const [teamId, teamJobs] of jobsByTeam) {
|
||||
// == Buckets for jobs ==
|
||||
let jobsForcedToCQ: {
|
||||
data: WebScraperOptions;
|
||||
opts: {
|
||||
jobId: string;
|
||||
priority: number;
|
||||
};
|
||||
}[] = [];
|
||||
|
||||
let jobsPotentiallyInCQ: {
|
||||
data: WebScraperOptions;
|
||||
opts: {
|
||||
jobId: string;
|
||||
priority: number;
|
||||
};
|
||||
}[] = [];
|
||||
|
||||
// == Select jobs by crawl ID ==
|
||||
const jobsByCrawlID = new Map<string, {
|
||||
data: WebScraperOptions;
|
||||
opts: {
|
||||
jobId: string;
|
||||
priority: number;
|
||||
};
|
||||
}[]>();
|
||||
|
||||
const jobsWithoutCrawlID: {
|
||||
data: WebScraperOptions;
|
||||
opts: {
|
||||
jobId: string;
|
||||
priority: number;
|
||||
};
|
||||
}[] = [];
|
||||
|
||||
for (const job of teamJobs) {
|
||||
if (job.data.crawl_id) {
|
||||
if (!jobsByCrawlID.has(job.data.crawl_id)) {
|
||||
jobsByCrawlID.set(job.data.crawl_id, []);
|
||||
}
|
||||
jobsByCrawlID.get(job.data.crawl_id)!.push(job);
|
||||
} else {
|
||||
jobsWithoutCrawlID.push(job);
|
||||
}
|
||||
}
|
||||
|
||||
// == Select jobs by crawl ID ==
|
||||
for (const [crawlID, crawlJobs] of jobsByCrawlID) {
|
||||
const crawl = await getCrawl(crawlID);
|
||||
const concurrencyLimit = !crawl
|
||||
? null
|
||||
: crawl.crawlerOptions?.delay === undefined && crawl.maxConcurrency === undefined
|
||||
? null
|
||||
: crawl.maxConcurrency ?? 1;
|
||||
|
||||
|
||||
if (concurrencyLimit === null) {
|
||||
// All jobs may be in the CQ depending on the global team concurrency limit
|
||||
jobsPotentiallyInCQ.push(...crawlJobs);
|
||||
} else {
|
||||
const crawlConcurrency = (await getCrawlConcurrencyLimitActiveJobs(crawlID)).length;
|
||||
const freeSlots = Math.max(concurrencyLimit - crawlConcurrency, 0);
|
||||
|
||||
// The first n jobs may be in the CQ depending on the global team concurrency limit
|
||||
jobsPotentiallyInCQ.push(...crawlJobs.slice(0, freeSlots));
|
||||
|
||||
// Every job after that must be in the CQ, as the crawl concurrency limit has been reached
|
||||
jobsForcedToCQ.push(...crawlJobs.slice(freeSlots));
|
||||
}
|
||||
}
|
||||
|
||||
// All jobs without a crawl ID may be in the CQ depending on the global team concurrency limit
|
||||
jobsPotentiallyInCQ.push(...jobsWithoutCrawlID);
|
||||
|
||||
if (dontAddToCCQ[0] && dontAddToCCQ[0].data && dontAddToCCQ[0].data.team_id) {
|
||||
const now = Date.now();
|
||||
maxConcurrency = (await getACUCTeam(dontAddToCCQ[0].data.team_id, false, true, dontAddToCCQ[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
cleanOldConcurrencyLimitEntries(dontAddToCCQ[0].data.team_id, now);
|
||||
const maxConcurrency = (await getACUCTeam(teamId, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
await cleanOldConcurrencyLimitEntries(teamId, now);
|
||||
|
||||
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(dontAddToCCQ[0].data.team_id, now)).length;
|
||||
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(teamId, now)).length;
|
||||
|
||||
countCanBeDirectlyAdded = Math.max(
|
||||
const countCanBeDirectlyAdded = Math.max(
|
||||
maxConcurrency - currentActiveConcurrency,
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
const addToBull = dontAddToCCQ.slice(0, countCanBeDirectlyAdded);
|
||||
const addToCQ = dontAddToCCQ.slice(countCanBeDirectlyAdded);
|
||||
const addToBull = jobsPotentiallyInCQ.slice(0, countCanBeDirectlyAdded);
|
||||
const addToCQ = jobsPotentiallyInCQ.slice(countCanBeDirectlyAdded).concat(jobsForcedToCQ);
|
||||
|
||||
// equals 2x the max concurrency
|
||||
if(addToCQ.length > maxConcurrency) {
|
||||
// logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`);
|
||||
// Only send notification if it's not a crawl or batch scrape
|
||||
if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) {
|
||||
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id);
|
||||
if (shouldSendNotification) {
|
||||
sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||
logger.error("Error sending notification (concurrency limit reached)", { error });
|
||||
});
|
||||
// equals 2x the max concurrency
|
||||
if((jobsPotentiallyInCQ.length - countCanBeDirectlyAdded) > maxConcurrency) {
|
||||
// logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`);
|
||||
// Only send notification if it's not a crawl or batch scrape
|
||||
if (!isCrawlOrBatchScrape(jobs[0].data)) {
|
||||
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
|
||||
if (shouldSendNotification) {
|
||||
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||
logger.error("Error sending notification (concurrency limit reached)", { error });
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
addToCQ.map(async (job) => {
|
||||
const size = JSON.stringify(job.data).length;
|
||||
return await Sentry.startSpan(
|
||||
{
|
||||
name: "Add scrape job",
|
||||
op: "queue.publish",
|
||||
attributes: {
|
||||
"messaging.message.id": job.opts.jobId,
|
||||
"messaging.destination.name": getScrapeQueue().name,
|
||||
"messaging.message.body.size": size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
const jobData = {
|
||||
...job.data,
|
||||
sentry: {
|
||||
trace: Sentry.spanToTraceHeader(span),
|
||||
baggage: Sentry.spanToBaggageHeader(span),
|
||||
size,
|
||||
},
|
||||
};
|
||||
|
||||
await _addScrapeJobToConcurrencyQueue(
|
||||
jobData,
|
||||
job.opts,
|
||||
job.opts.jobId,
|
||||
job.opts.priority,
|
||||
);
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
addToBull.map(async (job) => {
|
||||
const size = JSON.stringify(job.data).length;
|
||||
return await Sentry.startSpan(
|
||||
{
|
||||
name: "Add scrape job",
|
||||
op: "queue.publish",
|
||||
attributes: {
|
||||
"messaging.message.id": job.opts.jobId,
|
||||
"messaging.destination.name": getScrapeQueue().name,
|
||||
"messaging.message.body.size": size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await _addScrapeJobToBullMQ(
|
||||
{
|
||||
...job.data,
|
||||
sentry: {
|
||||
trace: Sentry.spanToTraceHeader(span),
|
||||
baggage: Sentry.spanToBaggageHeader(span),
|
||||
size,
|
||||
},
|
||||
},
|
||||
job.opts,
|
||||
job.opts.jobId,
|
||||
job.opts.priority,
|
||||
);
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
addToCCQ.map(async (job) => {
|
||||
const size = JSON.stringify(job.data).length;
|
||||
return await Sentry.startSpan(
|
||||
{
|
||||
name: "Add scrape job",
|
||||
op: "queue.publish",
|
||||
attributes: {
|
||||
"messaging.message.id": job.opts.jobId,
|
||||
"messaging.destination.name": getScrapeQueue().name,
|
||||
"messaging.message.body.size": size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await _addCrawlScrapeJobToConcurrencyQueue(
|
||||
{
|
||||
...job.data,
|
||||
sentry: {
|
||||
trace: Sentry.spanToTraceHeader(span),
|
||||
baggage: Sentry.spanToBaggageHeader(span),
|
||||
size,
|
||||
},
|
||||
},
|
||||
job.opts,
|
||||
job.opts.jobId,
|
||||
job.opts.priority,
|
||||
);
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
addToCQ.map(async (job) => {
|
||||
const size = JSON.stringify(job.data).length;
|
||||
return await Sentry.startSpan(
|
||||
{
|
||||
name: "Add scrape job",
|
||||
op: "queue.publish",
|
||||
attributes: {
|
||||
"messaging.message.id": job.opts.jobId,
|
||||
"messaging.destination.name": getScrapeQueue().name,
|
||||
"messaging.message.body.size": size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
const jobData = {
|
||||
...job.data,
|
||||
sentry: {
|
||||
trace: Sentry.spanToTraceHeader(span),
|
||||
baggage: Sentry.spanToBaggageHeader(span),
|
||||
size,
|
||||
},
|
||||
};
|
||||
|
||||
await _addScrapeJobToConcurrencyQueue(
|
||||
jobData,
|
||||
job.opts,
|
||||
job.opts.jobId,
|
||||
job.opts.priority,
|
||||
);
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
addToBull.map(async (job) => {
|
||||
const size = JSON.stringify(job.data).length;
|
||||
return await Sentry.startSpan(
|
||||
{
|
||||
name: "Add scrape job",
|
||||
op: "queue.publish",
|
||||
attributes: {
|
||||
"messaging.message.id": job.opts.jobId,
|
||||
"messaging.destination.name": getScrapeQueue().name,
|
||||
"messaging.message.body.size": size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await _addScrapeJobToBullMQ(
|
||||
{
|
||||
...job.data,
|
||||
sentry: {
|
||||
trace: Sentry.spanToTraceHeader(span),
|
||||
baggage: Sentry.spanToBaggageHeader(span),
|
||||
size,
|
||||
},
|
||||
},
|
||||
job.opts,
|
||||
job.opts.jobId,
|
||||
job.opts.priority,
|
||||
);
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
export function waitForJob(
|
||||
|
||||
@ -50,15 +50,8 @@ import { getJobs } from "..//controllers/v1/crawl-status";
|
||||
import { configDotenv } from "dotenv";
|
||||
import { scrapeOptions } from "../controllers/v1/types";
|
||||
import {
|
||||
cleanOldConcurrencyLimitEntries,
|
||||
cleanOldCrawlConcurrencyLimitEntries,
|
||||
getConcurrencyLimitActiveJobs,
|
||||
concurrentJobDone,
|
||||
pushConcurrencyLimitActiveJob,
|
||||
pushCrawlConcurrencyLimitActiveJob,
|
||||
removeConcurrencyLimitActiveJob,
|
||||
removeCrawlConcurrencyLimitActiveJob,
|
||||
takeConcurrencyLimitedJob,
|
||||
takeCrawlConcurrencyLimitedJob,
|
||||
} from "../lib/concurrency-limit";
|
||||
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
|
||||
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
|
||||
@ -803,67 +796,7 @@ const workerFun = async (
|
||||
runningJobs.delete(job.id);
|
||||
}
|
||||
|
||||
if (job.id && job.data.crawl_id && job.data.crawlerOptions?.delay) {
|
||||
await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id);
|
||||
cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
|
||||
|
||||
const delayInSeconds = job.data.crawlerOptions.delay;
|
||||
const delayInMs = delayInSeconds * 1000;
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, delayInMs));
|
||||
|
||||
const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id);
|
||||
if (nextCrawlJob !== null) {
|
||||
await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000);
|
||||
|
||||
await queue.add(
|
||||
nextCrawlJob.id,
|
||||
{
|
||||
...nextCrawlJob.data,
|
||||
},
|
||||
{
|
||||
...nextCrawlJob.opts,
|
||||
jobId: nextCrawlJob.id,
|
||||
priority: nextCrawlJob.priority,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (job.id && job.data && job.data.team_id) {
|
||||
const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||
|
||||
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
||||
await cleanOldConcurrencyLimitEntries(job.data.team_id);
|
||||
|
||||
// Check if we're under the concurrency limit before adding a new job
|
||||
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
|
||||
const concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
|
||||
|
||||
if (!concurrencyLimited) {
|
||||
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
|
||||
if (nextJob !== null) {
|
||||
await pushConcurrencyLimitActiveJob(
|
||||
job.data.team_id,
|
||||
nextJob.id,
|
||||
60 * 1000,
|
||||
); // 60s initial timeout
|
||||
|
||||
await queue.add(
|
||||
nextJob.id,
|
||||
{
|
||||
...nextJob.data,
|
||||
concurrencyLimitHit: true,
|
||||
},
|
||||
{
|
||||
...nextJob.opts,
|
||||
jobId: nextJob.id,
|
||||
priority: nextJob.priority,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
await concurrentJobDone(job);
|
||||
}
|
||||
|
||||
if (job.data && job.data.sentry && Sentry.isInitialized()) {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@mendable/firecrawl-js",
|
||||
"version": "1.25.5",
|
||||
"version": "1.25.6",
|
||||
"description": "JavaScript SDK for Firecrawl API",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
|
||||
@ -226,6 +226,7 @@ export interface CrawlParams {
|
||||
* If not provided, the crawler may use the robots.txt crawl delay if available.
|
||||
*/
|
||||
delay?: number;
|
||||
maxConcurrency?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1012,9 +1013,10 @@ export default class FirecrawlApp {
|
||||
idempotencyKey?: string,
|
||||
webhook?: CrawlParams["webhook"],
|
||||
ignoreInvalidURLs?: boolean,
|
||||
maxConcurrency?: number,
|
||||
): Promise<BatchScrapeStatusResponse | ErrorResponse> {
|
||||
const headers = this.prepareHeaders(idempotencyKey);
|
||||
let jsonData: any = { urls, webhook, ignoreInvalidURLs, ...params, origin: `js-sdk@${this.version}` };
|
||||
let jsonData: any = { urls, webhook, ignoreInvalidURLs, maxConcurrency, ...params, origin: `js-sdk@${this.version}` };
|
||||
if (jsonData?.extract?.schema) {
|
||||
let schema = jsonData.extract.schema;
|
||||
|
||||
|
||||
@ -263,6 +263,7 @@ class CrawlParams(pydantic.BaseModel):
|
||||
ignoreQueryParameters: Optional[bool] = None
|
||||
regexOnFullURL: Optional[bool] = None
|
||||
delay: Optional[int] = None # Delay in seconds between scrapes
|
||||
maxConcurrency: Optional[int] = None
|
||||
|
||||
class CrawlResponse(pydantic.BaseModel):
|
||||
"""Response from crawling operations."""
|
||||
@ -694,6 +695,7 @@ class FirecrawlApp:
|
||||
ignore_query_parameters: Optional[bool] = None,
|
||||
regex_on_full_url: Optional[bool] = None,
|
||||
delay: Optional[int] = None,
|
||||
max_concurrency: Optional[int] = None,
|
||||
poll_interval: Optional[int] = 2,
|
||||
idempotency_key: Optional[str] = None,
|
||||
**kwargs
|
||||
@ -717,6 +719,7 @@ class FirecrawlApp:
|
||||
ignore_query_parameters (Optional[bool]): Ignore URL parameters
|
||||
regex_on_full_url (Optional[bool]): Apply regex to full URLs
|
||||
delay (Optional[int]): Delay in seconds between scrapes
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
poll_interval (Optional[int]): Seconds between status checks (default: 2)
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
@ -764,7 +767,9 @@ class FirecrawlApp:
|
||||
crawl_params['regexOnFullURL'] = regex_on_full_url
|
||||
if delay is not None:
|
||||
crawl_params['delay'] = delay
|
||||
|
||||
if max_concurrency is not None:
|
||||
crawl_params['maxConcurrency'] = max_concurrency
|
||||
|
||||
# Add any additional kwargs
|
||||
crawl_params.update(kwargs)
|
||||
|
||||
@ -826,6 +831,8 @@ class FirecrawlApp:
|
||||
deduplicate_similar_urls (Optional[bool]): Remove similar URLs
|
||||
ignore_query_parameters (Optional[bool]): Ignore URL parameters
|
||||
regex_on_full_url (Optional[bool]): Apply regex to full URLs
|
||||
delay (Optional[int]): Delay in seconds between scrapes
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
|
||||
@ -873,7 +880,9 @@ class FirecrawlApp:
|
||||
crawl_params['regexOnFullURL'] = regex_on_full_url
|
||||
if delay is not None:
|
||||
crawl_params['delay'] = delay
|
||||
|
||||
if max_concurrency is not None:
|
||||
crawl_params['maxConcurrency'] = max_concurrency
|
||||
|
||||
# Add any additional kwargs
|
||||
crawl_params.update(kwargs)
|
||||
|
||||
@ -1049,6 +1058,8 @@ class FirecrawlApp:
|
||||
deduplicate_similar_urls: Optional[bool] = None,
|
||||
ignore_query_parameters: Optional[bool] = None,
|
||||
regex_on_full_url: Optional[bool] = None,
|
||||
delay: Optional[int] = None,
|
||||
max_concurrency: Optional[int] = None,
|
||||
idempotency_key: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> 'CrawlWatcher':
|
||||
@ -1070,6 +1081,8 @@ class FirecrawlApp:
|
||||
deduplicate_similar_urls (Optional[bool]): Remove similar URLs
|
||||
ignore_query_parameters (Optional[bool]): Ignore URL parameters
|
||||
regex_on_full_url (Optional[bool]): Apply regex to full URLs
|
||||
delay (Optional[int]): Delay in seconds between scrapes
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
|
||||
@ -1094,6 +1107,8 @@ class FirecrawlApp:
|
||||
deduplicate_similar_urls=deduplicate_similar_urls,
|
||||
ignore_query_parameters=ignore_query_parameters,
|
||||
regex_on_full_url=regex_on_full_url,
|
||||
delay=delay,
|
||||
max_concurrency=max_concurrency,
|
||||
idempotency_key=idempotency_key,
|
||||
**kwargs
|
||||
)
|
||||
@ -1210,6 +1225,7 @@ class FirecrawlApp:
|
||||
actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None,
|
||||
agent: Optional[AgentOptions] = None,
|
||||
poll_interval: Optional[int] = 2,
|
||||
max_concurrency: Optional[int] = None,
|
||||
idempotency_key: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> BatchScrapeStatusResponse:
|
||||
@ -1235,6 +1251,7 @@ class FirecrawlApp:
|
||||
json_options (Optional[JsonConfig]): JSON extraction config
|
||||
actions (Optional[List[Union]]): Actions to perform
|
||||
agent (Optional[AgentOptions]): Agent configuration
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
poll_interval (Optional[int]): Seconds between status checks (default: 2)
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
@ -1294,7 +1311,9 @@ class FirecrawlApp:
|
||||
scrape_params['actions'] = [action.dict(exclude_none=True) for action in actions]
|
||||
if agent is not None:
|
||||
scrape_params['agent'] = agent.dict(exclude_none=True)
|
||||
|
||||
if max_concurrency is not None:
|
||||
scrape_params['maxConcurrency'] = max_concurrency
|
||||
|
||||
# Add any additional kwargs
|
||||
scrape_params.update(kwargs)
|
||||
|
||||
@ -1343,6 +1362,7 @@ class FirecrawlApp:
|
||||
json_options: Optional[JsonConfig] = None,
|
||||
actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None,
|
||||
agent: Optional[AgentOptions] = None,
|
||||
max_concurrency: Optional[int] = None,
|
||||
idempotency_key: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> BatchScrapeResponse:
|
||||
@ -1368,6 +1388,7 @@ class FirecrawlApp:
|
||||
json_options (Optional[JsonConfig]): JSON extraction config
|
||||
actions (Optional[List[Union]]): Actions to perform
|
||||
agent (Optional[AgentOptions]): Agent configuration
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
|
||||
@ -1427,7 +1448,9 @@ class FirecrawlApp:
|
||||
scrape_params['actions'] = [action.dict(exclude_none=True) for action in actions]
|
||||
if agent is not None:
|
||||
scrape_params['agent'] = agent.dict(exclude_none=True)
|
||||
|
||||
if max_concurrency is not None:
|
||||
scrape_params['maxConcurrency'] = max_concurrency
|
||||
|
||||
# Add any additional kwargs
|
||||
scrape_params.update(kwargs)
|
||||
|
||||
@ -1475,6 +1498,7 @@ class FirecrawlApp:
|
||||
json_options: Optional[JsonConfig] = None,
|
||||
actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None,
|
||||
agent: Optional[AgentOptions] = None,
|
||||
max_concurrency: Optional[int] = None,
|
||||
idempotency_key: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> 'CrawlWatcher':
|
||||
@ -1500,6 +1524,7 @@ class FirecrawlApp:
|
||||
json_options (Optional[JsonConfig]): JSON extraction config
|
||||
actions (Optional[List[Union]]): Actions to perform
|
||||
agent (Optional[AgentOptions]): Agent configuration
|
||||
max_concurrency (Optional[int]): Maximum number of concurrent scrapes
|
||||
idempotency_key (Optional[str]): Unique key to prevent duplicate requests
|
||||
**kwargs: Additional parameters to pass to the API
|
||||
|
||||
@ -1555,7 +1580,9 @@ class FirecrawlApp:
|
||||
scrape_params['actions'] = [action.dict(exclude_none=True) for action in actions]
|
||||
if agent is not None:
|
||||
scrape_params['agent'] = agent.dict(exclude_none=True)
|
||||
|
||||
if max_concurrency is not None:
|
||||
scrape_params['maxConcurrency'] = max_concurrency
|
||||
|
||||
# Add any additional kwargs
|
||||
scrape_params.update(kwargs)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user