From bd9c53f8876e9a0599c712b69b64dfe9430778e2 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 14 Nov 2024 02:20:49 +0530 Subject: [PATCH] Fix Producer Sync Issue (#18635) (cherry picked from commit e7e4cbf77a9d57df810d10692464c581e9c285c5) --- .../bundles/searchIndex/SearchIndexApp.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 0b0ca52ce3d..f2ab9abd8da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -56,6 +56,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -161,6 +162,7 @@ public class SearchIndexApp extends AbstractNativeApplication { private volatile boolean stopped = false; private ExecutorService consumerExecutor; private ExecutorService producerExecutor; + private ExecutorService jobExecutor = Executors.newFixedThreadPool(2); private BlockingQueue> taskQueue = new LinkedBlockingQueue<>(100); private final AtomicReference searchIndexStats = new AtomicReference<>(); private final AtomicReference batchSize = new AtomicReference<>(5); @@ -300,7 +302,8 @@ public class SearchIndexApp extends AbstractNativeApplication { numProducers, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(jobData.getQueueSize())); + new LinkedBlockingQueue<>(jobData.getQueueSize()), + new ThreadPoolExecutor.CallerRunsPolicy()); try { processEntityReindex(jobExecutionContext); @@ -308,6 +311,7 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.error("Error during reindexing process.", e); throw e; } finally { + shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS); shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES); shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS); } @@ -317,6 +321,14 @@ public class SearchIndexApp extends AbstractNativeApplication { throws InterruptedException { int numConsumers = jobData.getConsumerThreads(); CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); + jobExecutor.submit(() -> submitProducerTask(producerLatch)); + jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext)); + + producerLatch.await(); + sendPoisonPills(numConsumers); + } + + private void submitProducerTask(CountDownLatch producerLatch) { for (String entityType : jobData.getEntities()) { try { reCreateIndexes(entityType); @@ -342,8 +354,10 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.error("Error processing entity type {}", entityType, e); } } + } - for (int i = 0; i < numConsumers; i++) { + private void submitConsumerTask(JobExecutionContext jobExecutionContext) { + for (int i = 0; i < jobData.getConsumerThreads(); i++) { consumerExecutor.submit( () -> { try { @@ -354,9 +368,6 @@ public class SearchIndexApp extends AbstractNativeApplication { } }); } - - producerLatch.await(); - sendPoisonPills(numConsumers); } private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {