patch for delay race condition

This commit is contained in:
Gergő Móricz 2025-06-19 16:22:14 +02:00
parent 7296b76e78
commit cb53b2b31e
2 changed files with 6 additions and 3 deletions

View File

@ -22,7 +22,7 @@ import { configDotenv } from "dotenv";
import type { Job, JobState, Queue } from "bullmq"; import type { Job, JobState, Queue } from "bullmq";
import { logger } from "../../lib/logger"; import { logger } from "../../lib/logger";
import { supabase_rr_service, supabase_service } from "../../services/supabase"; import { supabase_rr_service, supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitActiveJobs } from "../../lib/concurrency-limit";
import { getJobFromGCS } from "../../lib/gcs-jobs"; import { getJobFromGCS } from "../../lib/gcs-jobs";
configDotenv(); configDotenv();
@ -163,6 +163,7 @@ export async function crawlStatusController(
); );
const throttledJobsSet = new Set(await getConcurrencyLimitedJobs(req.auth.team_id)); const throttledJobsSet = new Set(await getConcurrencyLimitedJobs(req.auth.team_id));
const activeJobsSet = new Set(await getCrawlConcurrencyLimitActiveJobs(req.params.jobId));
const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = []; const validJobIDs: string[] = [];
@ -171,6 +172,9 @@ export async function crawlStatusController(
if (throttledJobsSet.has(id)) { if (throttledJobsSet.has(id)) {
validJobStatuses.push([id, "prioritized"]); validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id); validJobIDs.push(id);
} else if (status === "unknown" && activeJobsSet.has(id)) {
validJobStatuses.push([id, "active"]);
validJobIDs.push(id);
} else if ( } else if (
status !== "failed" && status !== "failed" &&
status !== "unknown" status !== "unknown"

View File

@ -216,8 +216,7 @@ async function getNextConcurrentJob(teamId: string): Promise<{
} }
for (const ignoredJob of ignoredJobs) { for (const ignoredJob of ignoredJobs) {
const timeout = ignoredJob.timeout - Date.now(); await pushConcurrencyLimitedJob(teamId, ignoredJob.job, ignoredJob.timeout);
await pushConcurrencyLimitedJob(teamId, ignoredJob.job, timeout);
} }
return finalJob; return finalJob;