mirror of
https://github.com/mendableai/firecrawl.git
synced 2025-12-04 19:13:44 +00:00
fixes, more extensive testing
This commit is contained in:
parent
c6232e6f46
commit
2b367fcd78
1
.github/workflows/test-server.yml
vendored
1
.github/workflows/test-server.yml
vendored
@ -25,6 +25,7 @@ env:
|
|||||||
INDEX_SUPABASE_ANON_TOKEN: ${{ secrets.INDEX_SUPABASE_ANON_TOKEN }}
|
INDEX_SUPABASE_ANON_TOKEN: ${{ secrets.INDEX_SUPABASE_ANON_TOKEN }}
|
||||||
INDEX_SUPABASE_URL: ${{ secrets.INDEX_SUPABASE_URL }}
|
INDEX_SUPABASE_URL: ${{ secrets.INDEX_SUPABASE_URL }}
|
||||||
TEST_API_KEY: ${{ secrets.TEST_API_KEY }}
|
TEST_API_KEY: ${{ secrets.TEST_API_KEY }}
|
||||||
|
TEST_API_KEY_CONCURRENCY: ${{ secrets.TEST_API_KEY_CONCURRENCY }}
|
||||||
FIRE_ENGINE_BETA_URL: ${{ secrets.FIRE_ENGINE_BETA_URL }}
|
FIRE_ENGINE_BETA_URL: ${{ secrets.FIRE_ENGINE_BETA_URL }}
|
||||||
USE_DB_AUTHENTICATION: true
|
USE_DB_AUTHENTICATION: true
|
||||||
SERPER_API_KEY: ${{ secrets.SERPER_API_KEY }}
|
SERPER_API_KEY: ${{ secrets.SERPER_API_KEY }}
|
||||||
|
|||||||
@ -1,209 +0,0 @@
|
|||||||
import { redisConnection } from "../services/queue-service";
|
|
||||||
import {
|
|
||||||
cleanOldConcurrencyLimitEntries,
|
|
||||||
getConcurrencyLimitActiveJobs,
|
|
||||||
pushConcurrencyLimitActiveJob,
|
|
||||||
removeConcurrencyLimitActiveJob,
|
|
||||||
takeConcurrencyLimitedJob,
|
|
||||||
pushConcurrencyLimitedJob,
|
|
||||||
getConcurrencyQueueJobsCount,
|
|
||||||
ConcurrencyLimitedJob,
|
|
||||||
} from "../lib/concurrency-limit";
|
|
||||||
|
|
||||||
// Mock Redis client
|
|
||||||
jest.mock("../services/queue-service", () => ({
|
|
||||||
redisConnection: {
|
|
||||||
zremrangebyscore: jest.fn(),
|
|
||||||
zrangebyscore: jest.fn(),
|
|
||||||
zadd: jest.fn(),
|
|
||||||
zrem: jest.fn(),
|
|
||||||
zmpop: jest.fn(),
|
|
||||||
zcard: jest.fn(),
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
|
|
||||||
describe("Concurrency Limit", () => {
|
|
||||||
const mockTeamId = "test-team-id";
|
|
||||||
const mockJobId = "test-job-id";
|
|
||||||
const mockNow = 1000000;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
jest.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("cleanOldConcurrencyLimitEntries", () => {
|
|
||||||
it("should remove entries older than current timestamp", async () => {
|
|
||||||
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
|
|
||||||
|
|
||||||
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limiter:test-team-id",
|
|
||||||
-Infinity,
|
|
||||||
mockNow
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("getConcurrencyLimitActiveJobs", () => {
|
|
||||||
it("should return active jobs after given timestamp", async () => {
|
|
||||||
const mockActiveJobs = ["job1", "job2"];
|
|
||||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(mockActiveJobs);
|
|
||||||
|
|
||||||
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
|
||||||
|
|
||||||
expect(result).toEqual(mockActiveJobs);
|
|
||||||
expect(redisConnection.zrangebyscore).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limiter:test-team-id",
|
|
||||||
mockNow,
|
|
||||||
Infinity
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should return empty array when no active jobs", async () => {
|
|
||||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
|
|
||||||
|
|
||||||
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
|
||||||
|
|
||||||
expect(result).toEqual([]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("pushConcurrencyLimitActiveJob", () => {
|
|
||||||
it("should add job with expiration timestamp", async () => {
|
|
||||||
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, 2 * 60 * 1000, mockNow);
|
|
||||||
|
|
||||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limiter:test-team-id",
|
|
||||||
mockNow + 2 * 60 * 1000, // stalledJobTimeoutMs
|
|
||||||
mockJobId
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("removeConcurrencyLimitActiveJob", () => {
|
|
||||||
it("should remove job from active jobs", async () => {
|
|
||||||
await removeConcurrencyLimitActiveJob(mockTeamId, mockJobId);
|
|
||||||
|
|
||||||
expect(redisConnection.zrem).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limiter:test-team-id",
|
|
||||||
mockJobId
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Queue Operations", () => {
|
|
||||||
const mockJob: ConcurrencyLimitedJob = {
|
|
||||||
id: mockJobId,
|
|
||||||
data: { test: "data" },
|
|
||||||
opts: {},
|
|
||||||
priority: 1,
|
|
||||||
};
|
|
||||||
|
|
||||||
describe("takeConcurrencyLimitedJob", () => {
|
|
||||||
it("should return null when queue is empty", async () => {
|
|
||||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue(null);
|
|
||||||
|
|
||||||
const result = await takeConcurrencyLimitedJob(mockTeamId);
|
|
||||||
|
|
||||||
expect(result).toBeNull();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should return and remove the highest priority job", async () => {
|
|
||||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
|
|
||||||
"key",
|
|
||||||
[[JSON.stringify(mockJob)]],
|
|
||||||
]);
|
|
||||||
|
|
||||||
const result = await takeConcurrencyLimitedJob(mockTeamId);
|
|
||||||
|
|
||||||
expect(result).toEqual(mockJob);
|
|
||||||
expect(redisConnection.zmpop).toHaveBeenCalledWith(
|
|
||||||
1,
|
|
||||||
"concurrency-limit-queue:test-team-id",
|
|
||||||
"MIN"
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("pushConcurrencyLimitedJob", () => {
|
|
||||||
it("should add job to queue with priority", async () => {
|
|
||||||
await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000);
|
|
||||||
|
|
||||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limit-queue:test-team-id",
|
|
||||||
mockJob.priority,
|
|
||||||
JSON.stringify(mockJob)
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should use default priority 1 when not specified", async () => {
|
|
||||||
const jobWithoutPriority = { ...mockJob };
|
|
||||||
delete jobWithoutPriority.priority;
|
|
||||||
|
|
||||||
await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority, 30000);
|
|
||||||
|
|
||||||
expect(redisConnection.zadd).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limit-queue:test-team-id",
|
|
||||||
1,
|
|
||||||
JSON.stringify(jobWithoutPriority)
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("getConcurrencyQueueJobsCount", () => {
|
|
||||||
it("should return the number of jobs in queue", async () => {
|
|
||||||
const mockCount = 5;
|
|
||||||
(redisConnection.zcard as jest.Mock).mockResolvedValue(mockCount);
|
|
||||||
|
|
||||||
const result = await getConcurrencyQueueJobsCount(mockTeamId);
|
|
||||||
|
|
||||||
expect(result).toBe(mockCount);
|
|
||||||
expect(redisConnection.zcard).toHaveBeenCalledWith(
|
|
||||||
"concurrency-limit-queue:test-team-id"
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should return 0 for empty queue", async () => {
|
|
||||||
(redisConnection.zcard as jest.Mock).mockResolvedValue(0);
|
|
||||||
|
|
||||||
const result = await getConcurrencyQueueJobsCount(mockTeamId);
|
|
||||||
|
|
||||||
expect(result).toBe(0);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Integration Scenarios", () => {
|
|
||||||
it("should handle complete job lifecycle", async () => {
|
|
||||||
const mockJob: ConcurrencyLimitedJob = {
|
|
||||||
id: "lifecycle-test",
|
|
||||||
data: { test: "lifecycle" },
|
|
||||||
opts: {},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Push job to queue
|
|
||||||
await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000);
|
|
||||||
expect(redisConnection.zadd).toHaveBeenCalled();
|
|
||||||
|
|
||||||
// Take job from queue
|
|
||||||
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
|
|
||||||
"key",
|
|
||||||
[[JSON.stringify(mockJob)]],
|
|
||||||
]);
|
|
||||||
const takenJob = await takeConcurrencyLimitedJob(mockTeamId);
|
|
||||||
expect(takenJob).toEqual(mockJob);
|
|
||||||
|
|
||||||
// Add to active jobs
|
|
||||||
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000, mockNow);
|
|
||||||
expect(redisConnection.zadd).toHaveBeenCalled();
|
|
||||||
|
|
||||||
// Verify active jobs
|
|
||||||
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([mockJob.id]);
|
|
||||||
const activeJobs = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
|
|
||||||
expect(activeJobs).toContain(mockJob.id);
|
|
||||||
|
|
||||||
// Remove from active jobs
|
|
||||||
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
|
|
||||||
expect(redisConnection.zrem).toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@ -1,30 +1,106 @@
|
|||||||
import { concurrencyCheck, crawlWithConcurrencyTracking } from "./lib";
|
import { batchScrapeWithConcurrencyTracking, concurrencyCheck, crawlWithConcurrencyTracking, defaultIdentity, Identity } from "./lib";
|
||||||
|
|
||||||
let accountConcurrencyLimit = 2;
|
const concurrencyIdentity: Identity = {
|
||||||
|
apiKey: process.env.TEST_API_KEY_CONCURRENCY ?? process.env.TEST_API_KEY!,
|
||||||
|
}
|
||||||
|
|
||||||
beforeAll(async () => {
|
if (!process.env.TEST_SUITE_SELF_HOSTED) {
|
||||||
const { maxConcurrency } = await concurrencyCheck();
|
let accountConcurrencyLimit = 2;
|
||||||
accountConcurrencyLimit = maxConcurrency;
|
|
||||||
console.log("Account concurrency limit:", accountConcurrencyLimit);
|
|
||||||
}, 10000);
|
|
||||||
|
|
||||||
describe("Concurrency queue and limit", () => {
|
beforeAll(async () => {
|
||||||
it("crawl utilizes full concurrency limit and doesn't go over", async () => {
|
const { maxConcurrency } = await concurrencyCheck(concurrencyIdentity);
|
||||||
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
accountConcurrencyLimit = maxConcurrency;
|
||||||
url: "https://firecrawl.dev",
|
|
||||||
limit: accountConcurrencyLimit * 2,
|
|
||||||
});
|
|
||||||
|
|
||||||
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
|
console.log("Account concurrency limit:", accountConcurrencyLimit);
|
||||||
}, 600000);
|
|
||||||
|
|
||||||
it("crawl handles maxConcurrency properly", async () => {
|
if (accountConcurrencyLimit > 20) {
|
||||||
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
console.warn("Your account's concurrency limit (" + accountConcurrencyLimit + ") is likely too high, which will cause these tests to fail. Please set up TEST_API_KEY_CONCURRENCY with an API key that has a lower concurrency limit.");
|
||||||
url: "https://firecrawl.dev",
|
}
|
||||||
limit: 15,
|
}, 10000);
|
||||||
maxConcurrency: 5,
|
|
||||||
});
|
describe("Concurrency queue and limit", () => {
|
||||||
|
it("crawl utilizes full concurrency limit and doesn't go over", async () => {
|
||||||
|
const limit = accountConcurrencyLimit * 2;
|
||||||
|
|
||||||
|
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
||||||
|
url: "https://firecrawl.dev",
|
||||||
|
limit,
|
||||||
|
}, concurrencyIdentity);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
|
||||||
|
expect(crawl.completed).toBe(limit);
|
||||||
|
}, 600000);
|
||||||
|
|
||||||
|
it("crawl handles maxConcurrency properly", async () => {
|
||||||
|
const { crawl, concurrencies } = await crawlWithConcurrencyTracking({
|
||||||
|
url: "https://firecrawl.dev",
|
||||||
|
limit: 15,
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies)).toBe(5);
|
||||||
|
expect(crawl.completed).toBe(15);
|
||||||
|
}, 600000);
|
||||||
|
|
||||||
|
it("crawl maxConcurrency stacks properly", async () => {
|
||||||
|
const [{ crawl: crawl1, concurrencies: concurrencies1 }, { crawl: crawl2, concurrencies: concurrencies2 }] = await Promise.all([
|
||||||
|
crawlWithConcurrencyTracking({
|
||||||
|
url: "https://firecrawl.dev",
|
||||||
|
limit: 15,
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity),
|
||||||
|
crawlWithConcurrencyTracking({
|
||||||
|
url: "https://firecrawl.dev",
|
||||||
|
limit: 15,
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies1, ...concurrencies2)).toBe(10);
|
||||||
|
expect(crawl1.completed).toBe(15);
|
||||||
|
expect(crawl2.completed).toBe(15);
|
||||||
|
}, 1200000);
|
||||||
|
|
||||||
expect(Math.max(...concurrencies)).toBe(5);
|
it("batch scrape utilizes full concurrency limit and doesn't go over", async () => {
|
||||||
}, 600000);
|
const limit = accountConcurrencyLimit * 2;
|
||||||
});
|
|
||||||
|
const { batchScrape, concurrencies } = await batchScrapeWithConcurrencyTracking({
|
||||||
|
urls: Array(limit).fill(0).map(_ => `https://firecrawl.dev`),
|
||||||
|
}, concurrencyIdentity);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies)).toBe(accountConcurrencyLimit);
|
||||||
|
expect(batchScrape.completed).toBe(limit);
|
||||||
|
}, 600000);
|
||||||
|
|
||||||
|
it("batch scrape handles maxConcurrency properly", async () => {
|
||||||
|
const { batchScrape, concurrencies } = await batchScrapeWithConcurrencyTracking({
|
||||||
|
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies)).toBe(5);
|
||||||
|
expect(batchScrape.completed).toBe(15);
|
||||||
|
}, 600000);
|
||||||
|
|
||||||
|
it("batch scrape maxConcurrency stacks properly", async () => {
|
||||||
|
const [{ batchScrape: batchScrape1, concurrencies: concurrencies1 }, { batchScrape: batchScrape2, concurrencies: concurrencies2 }] = await Promise.all([
|
||||||
|
batchScrapeWithConcurrencyTracking({
|
||||||
|
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity),
|
||||||
|
batchScrapeWithConcurrencyTracking({
|
||||||
|
urls: Array(15).fill(0).map(_ => `https://firecrawl.dev`),
|
||||||
|
maxConcurrency: 5,
|
||||||
|
}, concurrencyIdentity),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(Math.max(...concurrencies1, ...concurrencies2)).toBe(10);
|
||||||
|
expect(batchScrape1.completed).toBe(15);
|
||||||
|
expect(batchScrape2.completed).toBe(15);
|
||||||
|
}, 1200000);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
it("stubbed", () => {
|
||||||
|
expect(true).toBe(true);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@ -10,14 +10,22 @@ import request from "supertest";
|
|||||||
|
|
||||||
const TEST_URL = "http://127.0.0.1:3002";
|
const TEST_URL = "http://127.0.0.1:3002";
|
||||||
|
|
||||||
|
export type Identity = {
|
||||||
|
apiKey: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const defaultIdentity: Identity = {
|
||||||
|
apiKey: process.env.TEST_API_KEY!,
|
||||||
|
};
|
||||||
|
|
||||||
// =========================================
|
// =========================================
|
||||||
// Scrape API
|
// Scrape API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
async function scrapeRaw(body: ScrapeRequestInput) {
|
async function scrapeRaw(body: ScrapeRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/scrape")
|
.post("/v1/scrape")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
@ -34,8 +42,8 @@ function expectScrapeToFail(response: Awaited<ReturnType<typeof scrapeRaw>>) {
|
|||||||
expect(typeof response.body.error).toBe("string");
|
expect(typeof response.body.error).toBe("string");
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function scrape(body: ScrapeRequestInput): Promise<Document> {
|
export async function scrape(body: ScrapeRequestInput, identity = defaultIdentity): Promise<Document> {
|
||||||
const raw = await scrapeRaw(body);
|
const raw = await scrapeRaw(body, identity);
|
||||||
expectScrapeToSucceed(raw);
|
expectScrapeToSucceed(raw);
|
||||||
if (body.proxy === "stealth") {
|
if (body.proxy === "stealth") {
|
||||||
expect(raw.body.data.metadata.proxyUsed).toBe("stealth");
|
expect(raw.body.data.metadata.proxyUsed).toBe("stealth");
|
||||||
@ -45,24 +53,24 @@ export async function scrape(body: ScrapeRequestInput): Promise<Document> {
|
|||||||
return raw.body.data;
|
return raw.body.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function scrapeWithFailure(body: ScrapeRequestInput): Promise<{
|
export async function scrapeWithFailure(body: ScrapeRequestInput, identity = defaultIdentity): Promise<{
|
||||||
success: false;
|
success: false;
|
||||||
error: string;
|
error: string;
|
||||||
}> {
|
}> {
|
||||||
const raw = await scrapeRaw(body);
|
const raw = await scrapeRaw(body, identity);
|
||||||
expectScrapeToFail(raw);
|
expectScrapeToFail(raw);
|
||||||
return raw.body;
|
return raw.body;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function scrapeStatusRaw(jobId: string) {
|
export async function scrapeStatusRaw(jobId: string, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/scrape/" + encodeURIComponent(jobId))
|
.get("/v1/scrape/" + encodeURIComponent(jobId))
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function scrapeStatus(jobId: string): Promise<Document> {
|
export async function scrapeStatus(jobId: string, identity = defaultIdentity): Promise<Document> {
|
||||||
const raw = await scrapeStatusRaw(jobId);
|
const raw = await scrapeStatusRaw(jobId, identity);
|
||||||
expect(raw.statusCode).toBe(200);
|
expect(raw.statusCode).toBe(200);
|
||||||
expect(raw.body.success).toBe(true);
|
expect(raw.body.success).toBe(true);
|
||||||
expect(typeof raw.body.data).toBe("object");
|
expect(typeof raw.body.data).toBe("object");
|
||||||
@ -75,30 +83,30 @@ export async function scrapeStatus(jobId: string): Promise<Document> {
|
|||||||
// Crawl API
|
// Crawl API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
async function crawlStart(body: CrawlRequestInput) {
|
async function crawlStart(body: CrawlRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/crawl")
|
.post("/v1/crawl")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function crawlStatus(id: string) {
|
async function crawlStatus(id: string, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/crawl/" + encodeURIComponent(id))
|
.get("/v1/crawl/" + encodeURIComponent(id))
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function crawlOngoingRaw() {
|
async function crawlOngoingRaw(identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/crawl/ongoing")
|
.get("/v1/crawl/ongoing")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function crawlOngoing(): Promise<Exclude<OngoingCrawlsResponse, ErrorResponse>> {
|
export async function crawlOngoing(identity = defaultIdentity): Promise<Exclude<OngoingCrawlsResponse, ErrorResponse>> {
|
||||||
const res = await crawlOngoingRaw();
|
const res = await crawlOngoingRaw(identity);
|
||||||
expect(res.statusCode).toBe(200);
|
expect(res.statusCode).toBe(200);
|
||||||
expect(res.body.success).toBe(true);
|
expect(res.body.success).toBe(true);
|
||||||
return res.body;
|
return res.body;
|
||||||
@ -120,17 +128,17 @@ function expectCrawlToSucceed(response: Awaited<ReturnType<typeof crawlStatus>>)
|
|||||||
expect(response.body.data.length).toBeGreaterThan(0);
|
expect(response.body.data.length).toBeGreaterThan(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function asyncCrawl(body: CrawlRequestInput): Promise<Exclude<CrawlResponse, ErrorResponse>> {
|
export async function asyncCrawl(body: CrawlRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlResponse, ErrorResponse>> {
|
||||||
const cs = await crawlStart(body);
|
const cs = await crawlStart(body, identity);
|
||||||
expectCrawlStartToSucceed(cs);
|
expectCrawlStartToSucceed(cs);
|
||||||
return cs.body;
|
return cs.body;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function asyncCrawlWaitForFinish(id: string): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
export async function asyncCrawlWaitForFinish(id: string, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||||
let x;
|
let x;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await crawlStatus(id);
|
x = await crawlStatus(id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
} while (x.body.status === "scraping");
|
} while (x.body.status === "scraping");
|
||||||
@ -139,14 +147,14 @@ export async function asyncCrawlWaitForFinish(id: string): Promise<Exclude<Crawl
|
|||||||
return x.body;
|
return x.body;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function crawl(body: CrawlRequestInput): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
export async function crawl(body: CrawlRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||||
const cs = await crawlStart(body);
|
const cs = await crawlStart(body, identity);
|
||||||
expectCrawlStartToSucceed(cs);
|
expectCrawlStartToSucceed(cs);
|
||||||
|
|
||||||
let x;
|
let x;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await crawlStatus(cs.body.id);
|
x = await crawlStatus(cs.body.id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
} while (x.body.status === "scraping");
|
} while (x.body.status === "scraping");
|
||||||
@ -159,18 +167,18 @@ export async function crawl(body: CrawlRequestInput): Promise<Exclude<CrawlStatu
|
|||||||
// Batch Scrape API
|
// Batch Scrape API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
async function batchScrapeStart(body: BatchScrapeRequestInput) {
|
async function batchScrapeStart(body: BatchScrapeRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/batch/scrape")
|
.post("/v1/batch/scrape")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function batchScrapeStatus(id: string) {
|
async function batchScrapeStatus(id: string, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/batch/scrape/" + encodeURIComponent(id))
|
.get("/v1/batch/scrape/" + encodeURIComponent(id))
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,14 +198,14 @@ function expectBatchScrapeToSucceed(response: Awaited<ReturnType<typeof batchScr
|
|||||||
expect(response.body.data.length).toBeGreaterThan(0);
|
expect(response.body.data.length).toBeGreaterThan(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function batchScrape(body: BatchScrapeRequestInput): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
export async function batchScrape(body: BatchScrapeRequestInput, identity = defaultIdentity): Promise<Exclude<CrawlStatusResponse, ErrorResponse>> {
|
||||||
const bss = await batchScrapeStart(body);
|
const bss = await batchScrapeStart(body, identity);
|
||||||
expectBatchScrapeStartToSucceed(bss);
|
expectBatchScrapeStartToSucceed(bss);
|
||||||
|
|
||||||
let x;
|
let x;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await batchScrapeStatus(bss.body.id);
|
x = await batchScrapeStatus(bss.body.id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
} while (x.body.status === "scraping");
|
} while (x.body.status === "scraping");
|
||||||
@ -210,10 +218,10 @@ export async function batchScrape(body: BatchScrapeRequestInput): Promise<Exclud
|
|||||||
// Map API
|
// Map API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
export async function map(body: MapRequestInput) {
|
export async function map(body: MapRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/map")
|
.post("/v1/map")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
@ -229,18 +237,18 @@ export function expectMapToSucceed(response: Awaited<ReturnType<typeof map>>) {
|
|||||||
// Extract API
|
// Extract API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
async function extractStart(body: ExtractRequestInput) {
|
async function extractStart(body: ExtractRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/extract")
|
.post("/v1/extract")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function extractStatus(id: string) {
|
async function extractStatus(id: string, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/extract/" + encodeURIComponent(id))
|
.get("/v1/extract/" + encodeURIComponent(id))
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,14 +267,14 @@ function expectExtractToSucceed(response: Awaited<ReturnType<typeof extractStatu
|
|||||||
expect(response.body).toHaveProperty("data");
|
expect(response.body).toHaveProperty("data");
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function extract(body: ExtractRequestInput): Promise<ExtractResponse> {
|
export async function extract(body: ExtractRequestInput, identity = defaultIdentity): Promise<ExtractResponse> {
|
||||||
const es = await extractStart(body);
|
const es = await extractStart(body, identity);
|
||||||
expectExtractStartToSucceed(es);
|
expectExtractStartToSucceed(es);
|
||||||
|
|
||||||
let x;
|
let x;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await extractStatus(es.body.id);
|
x = await extractStatus(es.body.id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
} while (x.body.status === "processing");
|
} while (x.body.status === "processing");
|
||||||
@ -279,10 +287,10 @@ export async function extract(body: ExtractRequestInput): Promise<ExtractRespons
|
|||||||
// Search API
|
// Search API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
async function searchRaw(body: SearchRequestInput) {
|
async function searchRaw(body: SearchRequestInput, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/search")
|
.post("/v1/search")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
@ -295,8 +303,8 @@ function expectSearchToSucceed(response: Awaited<ReturnType<typeof searchRaw>>)
|
|||||||
expect(response.body.data.length).toBeGreaterThan(0);
|
expect(response.body.data.length).toBeGreaterThan(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function search(body: SearchRequestInput): Promise<Document[]> {
|
export async function search(body: SearchRequestInput, identity = defaultIdentity): Promise<Document[]> {
|
||||||
const raw = await searchRaw(body);
|
const raw = await searchRaw(body, identity);
|
||||||
expectSearchToSucceed(raw);
|
expectSearchToSucceed(raw);
|
||||||
return raw.body.data;
|
return raw.body.data;
|
||||||
}
|
}
|
||||||
@ -305,10 +313,10 @@ export async function search(body: SearchRequestInput): Promise<Document[]> {
|
|||||||
// Billing API
|
// Billing API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
export async function creditUsage(): Promise<{ remaining_credits: number }> {
|
export async function creditUsage(identity = defaultIdentity): Promise<{ remaining_credits: number }> {
|
||||||
const req = (await request(TEST_URL)
|
const req = (await request(TEST_URL)
|
||||||
.get("/v1/team/credit-usage")
|
.get("/v1/team/credit-usage")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json"));
|
.set("Content-Type", "application/json"));
|
||||||
|
|
||||||
if (req.status !== 200) {
|
if (req.status !== 200) {
|
||||||
@ -318,10 +326,10 @@ export async function creditUsage(): Promise<{ remaining_credits: number }> {
|
|||||||
return req.body.data;
|
return req.body.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function tokenUsage(): Promise<{ remaining_tokens: number }> {
|
export async function tokenUsage(identity = defaultIdentity): Promise<{ remaining_tokens: number }> {
|
||||||
return (await request(TEST_URL)
|
return (await request(TEST_URL)
|
||||||
.get("/v1/team/token-usage")
|
.get("/v1/team/token-usage")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")).body.data;
|
.set("Content-Type", "application/json")).body.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -329,10 +337,10 @@ export async function tokenUsage(): Promise<{ remaining_tokens: number }> {
|
|||||||
// Concurrency API
|
// Concurrency API
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
export async function concurrencyCheck(): Promise<{ concurrency: number, maxConcurrency: number }> {
|
export async function concurrencyCheck(identity = defaultIdentity): Promise<{ concurrency: number, maxConcurrency: number }> {
|
||||||
const x = (await request(TEST_URL)
|
const x = (await request(TEST_URL)
|
||||||
.get("/v1/concurrency-check")
|
.get("/v1/concurrency-check")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json"));
|
.set("Content-Type", "application/json"));
|
||||||
|
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
@ -340,20 +348,20 @@ export async function concurrencyCheck(): Promise<{ concurrency: number, maxConc
|
|||||||
return x.body;
|
return x.body;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function crawlWithConcurrencyTracking(body: CrawlRequestInput): Promise<{
|
export async function crawlWithConcurrencyTracking(body: CrawlRequestInput, identity = defaultIdentity): Promise<{
|
||||||
crawl: Exclude<CrawlStatusResponse, ErrorResponse>;
|
crawl: Exclude<CrawlStatusResponse, ErrorResponse>;
|
||||||
concurrencies: number[];
|
concurrencies: number[];
|
||||||
}> {
|
}> {
|
||||||
const cs = await crawlStart(body);
|
const cs = await crawlStart(body, identity);
|
||||||
expectCrawlStartToSucceed(cs);
|
expectCrawlStartToSucceed(cs);
|
||||||
|
|
||||||
let x, concurrencies: number[] = [];
|
let x, concurrencies: number[] = [];
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await crawlStatus(cs.body.id);
|
x = await crawlStatus(cs.body.id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
concurrencies.push((await concurrencyCheck()).concurrency);
|
concurrencies.push((await concurrencyCheck(identity)).concurrency);
|
||||||
} while (x.body.status === "scraping");
|
} while (x.body.status === "scraping");
|
||||||
|
|
||||||
expectCrawlToSucceed(x);
|
expectCrawlToSucceed(x);
|
||||||
@ -363,6 +371,29 @@ export async function crawlWithConcurrencyTracking(body: CrawlRequestInput): Pro
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function batchScrapeWithConcurrencyTracking(body: BatchScrapeRequestInput, identity = defaultIdentity): Promise<{
|
||||||
|
batchScrape: Exclude<CrawlStatusResponse, ErrorResponse>;
|
||||||
|
concurrencies: number[];
|
||||||
|
}> {
|
||||||
|
const cs = await batchScrapeStart(body, identity);
|
||||||
|
expectBatchScrapeStartToSucceed(cs);
|
||||||
|
|
||||||
|
let x, concurrencies: number[] = [];
|
||||||
|
|
||||||
|
do {
|
||||||
|
x = await batchScrapeStatus(cs.body.id, identity);
|
||||||
|
expect(x.statusCode).toBe(200);
|
||||||
|
expect(typeof x.body.status).toBe("string");
|
||||||
|
concurrencies.push((await concurrencyCheck(identity)).concurrency);
|
||||||
|
} while (x.body.status === "scraping");
|
||||||
|
|
||||||
|
expectBatchScrapeToSucceed(x);
|
||||||
|
return {
|
||||||
|
batchScrape: x.body,
|
||||||
|
concurrencies,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================
|
// =========================================
|
||||||
// =========================================
|
// =========================================
|
||||||
|
|
||||||
@ -376,18 +407,18 @@ async function deepResearchStart(body: {
|
|||||||
formats?: string[];
|
formats?: string[];
|
||||||
topic?: string;
|
topic?: string;
|
||||||
jsonOptions?: any;
|
jsonOptions?: any;
|
||||||
}) {
|
}, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.post("/v1/deep-research")
|
.post("/v1/deep-research")
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.set("Content-Type", "application/json")
|
.set("Content-Type", "application/json")
|
||||||
.send(body);
|
.send(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function deepResearchStatus(id: string) {
|
async function deepResearchStatus(id: string, identity = defaultIdentity) {
|
||||||
return await request(TEST_URL)
|
return await request(TEST_URL)
|
||||||
.get("/v1/deep-research/" + encodeURIComponent(id))
|
.get("/v1/deep-research/" + encodeURIComponent(id))
|
||||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
.set("Authorization", `Bearer ${identity.apiKey}`)
|
||||||
.send();
|
.send();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,14 +438,14 @@ export async function deepResearch(body: {
|
|||||||
formats?: string[];
|
formats?: string[];
|
||||||
topic?: string;
|
topic?: string;
|
||||||
jsonOptions?: any;
|
jsonOptions?: any;
|
||||||
}) {
|
}, identity = defaultIdentity) {
|
||||||
const ds = await deepResearchStart(body);
|
const ds = await deepResearchStart(body, identity);
|
||||||
expectDeepResearchStartToSucceed(ds);
|
expectDeepResearchStartToSucceed(ds);
|
||||||
|
|
||||||
let x;
|
let x;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
x = await deepResearchStatus(ds.body.id);
|
x = await deepResearchStatus(ds.body.id, identity);
|
||||||
expect(x.statusCode).toBe(200);
|
expect(x.statusCode).toBe(200);
|
||||||
expect(typeof x.body.status).toBe("string");
|
expect(typeof x.body.status).toBe("string");
|
||||||
} while (x.body.status === "processing");
|
} while (x.body.status === "processing");
|
||||||
|
|||||||
@ -2,8 +2,9 @@ import { RateLimiterMode } from "../types";
|
|||||||
import { redisEvictConnection } from "../services/redis";
|
import { redisEvictConnection } from "../services/redis";
|
||||||
import type { Job, JobsOptions } from "bullmq";
|
import type { Job, JobsOptions } from "bullmq";
|
||||||
import { getACUCTeam } from "../controllers/auth";
|
import { getACUCTeam } from "../controllers/auth";
|
||||||
import { getCrawl } from "./crawl-redis";
|
import { getCrawl, StoredCrawl } from "./crawl-redis";
|
||||||
import { getScrapeQueue } from "../services/queue-service";
|
import { getScrapeQueue } from "../services/queue-service";
|
||||||
|
import { logger } from "./logger";
|
||||||
|
|
||||||
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
|
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
|
||||||
const constructQueueKey = (team_id: string) =>
|
const constructQueueKey = (team_id: string) =>
|
||||||
@ -170,6 +171,10 @@ async function getNextConcurrentJob(teamId: string): Promise<{
|
|||||||
timeout: number;
|
timeout: number;
|
||||||
} | null = null;
|
} | null = null;
|
||||||
|
|
||||||
|
const crawlCache = new Map<string, StoredCrawl>();
|
||||||
|
|
||||||
|
const debugMaxTeamConcurrency = (await getConcurrencyLimitActiveJobs(teamId)).length;
|
||||||
|
|
||||||
while (finalJob === null) {
|
while (finalJob === null) {
|
||||||
const res = await takeConcurrencyLimitedJobAndTimeout(teamId);
|
const res = await takeConcurrencyLimitedJobAndTimeout(teamId);
|
||||||
if (res === null) {
|
if (res === null) {
|
||||||
@ -178,20 +183,26 @@ async function getNextConcurrentJob(teamId: string): Promise<{
|
|||||||
|
|
||||||
// If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit
|
// If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit
|
||||||
if (res.job.data.crawl_id) {
|
if (res.job.data.crawl_id) {
|
||||||
const sc = await getCrawl(res.job.data.crawl_id);
|
const sc = crawlCache.get(res.job.data.crawl_id) ?? await getCrawl(res.job.data.crawl_id);
|
||||||
|
if (sc !== null) {
|
||||||
|
crawlCache.set(res.job.data.crawl_id, sc);
|
||||||
|
}
|
||||||
|
|
||||||
const maxCrawlConcurrency = sc === null
|
const maxCrawlConcurrency = sc === null
|
||||||
? null
|
? null
|
||||||
: sc.crawlerOptions.delay !== undefined
|
: (typeof sc.crawlerOptions?.delay === "number")
|
||||||
? 1
|
? 1
|
||||||
: sc.maxConcurrency ?? null;
|
: sc.maxConcurrency ?? null;
|
||||||
|
|
||||||
if (maxCrawlConcurrency !== null) {
|
if (maxCrawlConcurrency !== null) {
|
||||||
// If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit
|
// If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit
|
||||||
const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length;
|
const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length;
|
||||||
if (currentActiveConcurrency < maxCrawlConcurrency) {
|
if (currentActiveConcurrency < maxCrawlConcurrency) {
|
||||||
|
logger.debug("Crawl " + res.job.data.crawl_id.slice(-1) + " concurrency is " + currentActiveConcurrency + " of " + maxCrawlConcurrency + " (team concurrency: " + debugMaxTeamConcurrency + "), picking job");
|
||||||
// If we're under the max concurrency limit, we can run the job
|
// If we're under the max concurrency limit, we can run the job
|
||||||
finalJob = res;
|
finalJob = res;
|
||||||
} else {
|
} else {
|
||||||
|
logger.debug("Crawl " + res.job.data.crawl_id.slice(-1) + " concurrency is " + currentActiveConcurrency + " of " + maxCrawlConcurrency + " (team concurrency: " + debugMaxTeamConcurrency + "), ignoring job");
|
||||||
// If we're at the max concurrency limit, we need to ignore the job
|
// If we're at the max concurrency limit, we need to ignore the job
|
||||||
ignoredJobs.push({
|
ignoredJobs.push({
|
||||||
job: res.job,
|
job: res.job,
|
||||||
@ -199,10 +210,12 @@ async function getNextConcurrentJob(teamId: string): Promise<{
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
logger.debug("Crawl " + res.job.data.crawl_id.slice(-1) + " has no max concurrency limit (team concurrency: " + debugMaxTeamConcurrency + "), picking job");
|
||||||
// If the crawl has no max concurrency limit, we can run the job
|
// If the crawl has no max concurrency limit, we can run the job
|
||||||
finalJob = res;
|
finalJob = res;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
logger.debug("Job is not associated with a crawl ID (team concurrency: " + debugMaxTeamConcurrency + "), picking job");
|
||||||
// If the job is not associated with a crawl ID, we can run the job
|
// If the job is not associated with a crawl ID, we can run the job
|
||||||
finalJob = res;
|
finalJob = res;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -58,10 +58,13 @@ export async function _addScrapeJobToBullMQ(
|
|||||||
webScraperOptions &&
|
webScraperOptions &&
|
||||||
webScraperOptions.team_id
|
webScraperOptions.team_id
|
||||||
) {
|
) {
|
||||||
if (webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay) {
|
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
|
||||||
await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000);
|
|
||||||
} else {
|
if (webScraperOptions.crawl_id) {
|
||||||
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
|
const sc = await getCrawl(webScraperOptions.crawl_id);
|
||||||
|
if (webScraperOptions.crawlerOptions?.delay || sc?.maxConcurrency) {
|
||||||
|
await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +246,7 @@ export async function addScrapeJobs(
|
|||||||
const crawl = await getCrawl(crawlID);
|
const crawl = await getCrawl(crawlID);
|
||||||
const concurrencyLimit = !crawl
|
const concurrencyLimit = !crawl
|
||||||
? null
|
? null
|
||||||
: crawl.crawlerOptions.delay === undefined && crawl.maxConcurrency === undefined
|
: crawl.crawlerOptions?.delay === undefined && crawl.maxConcurrency === undefined
|
||||||
? null
|
? null
|
||||||
: crawl.maxConcurrency ?? 1;
|
: crawl.maxConcurrency ?? 1;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user