Fix background job issue (#21436)

This commit is contained in:
Mohit Yadav 2025-05-28 19:51:57 +05:30 committed by GitHub
parent 859e38057e
commit 5c9ba21bbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,6 +15,7 @@ import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys; import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys;
import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.openmetadata.schema.jobs.BackgroundJob; import org.openmetadata.schema.jobs.BackgroundJob;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -65,11 +66,20 @@ public interface JobDAO {
return Optional.ofNullable(fetchPendingJobInternal()); return Optional.ofNullable(fetchPendingJobInternal());
} }
@SqlQuery( @ConnectionAwareSqlQuery(
"SELECT id,jobType,methodName,jobArgs,status,createdAt,updatedAt,createdBy,runAt FROM background_jobs" value =
+ " WHERE status = 'PENDING'" "SELECT id,jobType,methodName,jobArgs,status,createdAt,updatedAt,createdBy,runAt FROM background_jobs"
+ " AND COALESCE(runAt, 0) <= UNIX_TIMESTAMP(NOW(3)) * 1000" + " WHERE status = 'PENDING'"
+ " ORDER BY createdAt LIMIT 1") + " AND COALESCE(runAt, 0) <= UNIX_TIMESTAMP(NOW(3)) * 1000"
+ " ORDER BY createdAt LIMIT 1",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT id,jobType,methodName,jobArgs,status,createdAt,updatedAt,createdBy,runAt FROM background_jobs"
+ " WHERE status = 'PENDING'"
+ " AND COALESCE(runAt, 0) <= EXTRACT(EPOCH FROM NOW()) * 1000"
+ " ORDER BY createdAt LIMIT 1",
connectionType = POSTGRES)
@RegisterRowMapper(BackgroundJobMapper.class) @RegisterRowMapper(BackgroundJobMapper.class)
BackgroundJob fetchPendingJobInternal() throws StatementException; BackgroundJob fetchPendingJobInternal() throws StatementException;