Add Latch for consumers (#18655)

This commit is contained in:
Mohit Yadav 2024-11-15 14:15:35 +05:30 committed by GitHub
parent 0a050183c1
commit 8c6de372f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -323,12 +323,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void processEntityReindex(JobExecutionContext jobExecutionContext) private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException { throws InterruptedException {
int numConsumers = jobData.getConsumerThreads(); int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); int latchCount = getTotalLatchCount(jobData.getEntities());
CountDownLatch producerLatch = new CountDownLatch(latchCount);
CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers);
submitProducerTask(producerLatch); submitProducerTask(producerLatch);
submitConsumerTask(jobExecutionContext); submitConsumerTask(jobExecutionContext, consumerLatch);
producerLatch.await(); producerLatch.await();
sendPoisonPills(numConsumers); sendPoisonPills(numConsumers);
consumerLatch.await();
} }
private void submitProducerTask(CountDownLatch producerLatch) { private void submitProducerTask(CountDownLatch producerLatch) {
@ -369,12 +372,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private void submitConsumerTask(JobExecutionContext jobExecutionContext) { private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) {
for (int i = 0; i < jobData.getConsumerThreads(); i++) { for (int i = 0; i < jobData.getConsumerThreads(); i++) {
consumerExecutor.submit( consumerExecutor.submit(
() -> { () -> {
try { try {
consumeTasks(jobExecutionContext); consumeTasks(jobExecutionContext, latch);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOG.warn("Consumer thread interrupted."); LOG.warn("Consumer thread interrupted.");
@ -383,7 +386,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException { private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch)
throws InterruptedException {
while (true) { while (true) {
IndexingTask<?> task = taskQueue.take(); IndexingTask<?> task = taskQueue.take();
LOG.info( LOG.info(
@ -392,9 +396,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
task.currentEntityOffset()); task.currentEntityOffset());
if (task == IndexingTask.POISON_PILL) { if (task == IndexingTask.POISON_PILL) {
LOG.debug("Received POISON_PILL. Consumer thread terminating."); LOG.debug("Received POISON_PILL. Consumer thread terminating.");
latch.countDown();
break; break;
} }
processTask(task, jobExecutionContext); processTask(task, jobExecutionContext, latch);
} }
} }
@ -590,7 +595,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
} }
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) { private void processTask(
IndexingTask<?> task, JobExecutionContext jobExecutionContext, CountDownLatch latch) {
String entityType = task.entityType(); String entityType = task.entityType();
ResultList<?> entities = task.entities(); ResultList<?> entities = task.entities();
Map<String, Object> contextData = new HashMap<>(); Map<String, Object> contextData = new HashMap<>();
@ -633,7 +639,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
} catch (Exception e) { } catch (Exception e) {
synchronized (jobDataLock) { synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
jobData.setFailure( jobData.setFailure(
new IndexingError() new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB) .withErrorSource(IndexingError.ErrorSource.JOB)
@ -647,6 +653,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.error("Unexpected error during processing task for entity {}", entityType, e); LOG.error("Unexpected error during processing task for entity {}", entityType, e);
} finally { } finally {
sendUpdates(jobExecutionContext); sendUpdates(jobExecutionContext);
latch.countDown();
} }
} }