Fix Producer Sync Issue (#18635)

(cherry picked from commit e7e4cbf77a9d57df810d10692464c581e9c285c5)
This commit is contained in:
Mohit Yadav 2024-11-14 02:20:49 +05:30 committed by mohitdeuex
parent 500de4e8d1
commit bd9c53f887

View File

@ -56,6 +56,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -161,6 +162,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
private volatile boolean stopped = false;
private ExecutorService consumerExecutor;
private ExecutorService producerExecutor;
private ExecutorService jobExecutor = Executors.newFixedThreadPool(2);
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
@ -300,7 +302,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
numProducers,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(jobData.getQueueSize()));
new LinkedBlockingQueue<>(jobData.getQueueSize()),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
processEntityReindex(jobExecutionContext);
@ -308,6 +311,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.error("Error during reindexing process.", e);
throw e;
} finally {
shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
}
@ -317,6 +321,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
jobExecutor.submit(() -> submitProducerTask(producerLatch));
jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext));
producerLatch.await();
sendPoisonPills(numConsumers);
}
private void submitProducerTask(CountDownLatch producerLatch) {
for (String entityType : jobData.getEntities()) {
try {
reCreateIndexes(entityType);
@ -342,8 +354,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.error("Error processing entity type {}", entityType, e);
}
}
}
for (int i = 0; i < numConsumers; i++) {
private void submitConsumerTask(JobExecutionContext jobExecutionContext) {
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
consumerExecutor.submit(
() -> {
try {
@ -354,9 +368,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
});
}
producerLatch.await();
sendPoisonPills(numConsumers);
}
private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {