From b45eb5a4166b82d7299806dd739ba09cd8f3d165 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:16:43 +0530 Subject: [PATCH] Removed Consumer (#18685) --- .../bundles/searchIndex/SearchIndexApp.java | 95 ++++--------------- .../openmetadata/service/jdbi3/EntityDAO.java | 2 +- 2 files changed, 19 insertions(+), 78 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 8ea23d9b94c..2a356bf1024 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -58,6 +58,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -161,10 +162,8 @@ public class SearchIndexApp extends AbstractNativeApplication { @Getter private EventPublisherJob jobData; private final Object jobDataLock = new Object(); private volatile boolean stopped = false; - private ExecutorService consumerExecutor; private ExecutorService producerExecutor; private final ExecutorService jobExecutor = Executors.newCachedThreadPool(); - private BlockingQueue> taskQueue = new LinkedBlockingQueue<>(100); private BlockingQueue producerQueue = new LinkedBlockingQueue<>(100); private final AtomicReference searchIndexStats = new AtomicReference<>(); private final AtomicReference batchSize = new AtomicReference<>(5); @@ -290,15 +289,7 @@ public class SearchIndexApp extends AbstractNativeApplication { int numConsumers = jobData.getConsumerThreads(); LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); - taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); - consumerExecutor = - new ThreadPoolExecutor( - numConsumers, - numConsumers, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(jobData.getQueueSize())); producerExecutor = new ThreadPoolExecutor( numProducers, @@ -316,25 +307,19 @@ public class SearchIndexApp extends AbstractNativeApplication { } finally { shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS); shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES); - shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS); } } private void processEntityReindex(JobExecutionContext jobExecutionContext) throws InterruptedException { - int numConsumers = jobData.getConsumerThreads(); int latchCount = getTotalLatchCount(jobData.getEntities()); CountDownLatch producerLatch = new CountDownLatch(latchCount); - CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers); - submitProducerTask(producerLatch); - submitConsumerTask(jobExecutionContext, consumerLatch); - + submitProducerTask(jobExecutionContext, producerLatch); producerLatch.await(); - sendPoisonPills(numConsumers); - consumerLatch.await(); } - private void submitProducerTask(CountDownLatch producerLatch) { + private void submitProducerTask( + JobExecutionContext jobExecutionContext, CountDownLatch producerLatch) { for (String entityType : jobData.getEntities()) { jobExecutor.submit( () -> { @@ -342,9 +327,11 @@ public class SearchIndexApp extends AbstractNativeApplication { reCreateIndexes(entityType); int totalEntityRecords = getTotalEntityRecords(entityType); Source source = createSource(entityType); - int noOfThreads = calculateNumberOfThreads(totalEntityRecords); + int loadPerThread = calculateNumberOfThreads(totalEntityRecords); + Semaphore semaphore = new Semaphore(jobData.getQueueSize()); if (totalEntityRecords > 0) { - for (int i = 0; i < noOfThreads; i++) { + for (int i = 0; i < loadPerThread; i++) { + semaphore.acquire(); LOG.debug( "Submitting producer task current queue size: {}", producerQueue.size()); int currentOffset = i * batchSize.get(); @@ -355,12 +342,15 @@ public class SearchIndexApp extends AbstractNativeApplication { "Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}", currentOffset, producerLatch.getCount()); - processReadTask(entityType, source, currentOffset); + processReadTask(jobExecutionContext, entityType, source, currentOffset); } catch (Exception e) { LOG.error("Error processing entity type {}", entityType, e); } finally { - LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount()); + LOG.debug( + "Producer Latch Down and Semaphore Release, Current : {}", + producerLatch.getCount()); producerLatch.countDown(); + semaphore.release(); } }); } @@ -372,50 +362,6 @@ public class SearchIndexApp extends AbstractNativeApplication { } } - private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) { - for (int i = 0; i < jobData.getConsumerThreads(); i++) { - consumerExecutor.submit( - () -> { - try { - consumeTasks(jobExecutionContext, latch); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Consumer thread interrupted."); - } - }); - } - } - - private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch) - throws InterruptedException { - while (true) { - IndexingTask task = taskQueue.take(); - LOG.info( - "Consuming Indexing Task for entityType: {}, entity offset : {}", - task.entityType(), - task.currentEntityOffset()); - if (task == IndexingTask.POISON_PILL) { - LOG.debug("Received POISON_PILL. Consumer thread terminating."); - latch.countDown(); - break; - } - processTask(task, jobExecutionContext, latch); - } - } - - /** - * Sends POISON_PILLs to signal consumer threads to terminate. - * - * @param numConsumers The number of consumers to signal. - * @throws InterruptedException If the thread is interrupted while waiting. - */ - private void sendPoisonPills(int numConsumers) throws InterruptedException { - for (int i = 0; i < numConsumers; i++) { - taskQueue.put(IndexingTask.POISON_PILL); - } - LOG.debug("Sent {} POISON_PILLs to consumers.", numConsumers); - } - /** * Shuts down an executor service gracefully. * @@ -592,11 +538,9 @@ public class SearchIndexApp extends AbstractNativeApplication { stopped = true; shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS); - shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); } - private void processTask( - IndexingTask task, JobExecutionContext jobExecutionContext, CountDownLatch latch) { + private void processTask(IndexingTask task, JobExecutionContext jobExecutionContext) { String entityType = task.entityType(); ResultList entities = task.entities(); Map contextData = new HashMap<>(); @@ -653,7 +597,6 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.error("Unexpected error during processing task for entity {}", entityType, e); } finally { sendUpdates(jobExecutionContext); - latch.countDown(); } } @@ -697,20 +640,18 @@ public class SearchIndexApp extends AbstractNativeApplication { .getTotalRecords(); } - private void processReadTask(String entityType, Source source, int offset) { + private void processReadTask( + JobExecutionContext jobExecutionContext, String entityType, Source source, int offset) { try { Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset))); - LOG.info("Read Entities with CurrentOffset: {}", offset); + LOG.debug("Read Entities with entityType: {}, CurrentOffset: {}", entityType, offset); if (resultList != null) { ResultList entities = extractEntities(entityType, resultList); if (!nullOrEmpty(entities.getData())) { IndexingTask task = new IndexingTask<>(entityType, entities, offset); - taskQueue.put(task); + processTask(task, jobExecutionContext); } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Reader thread interrupted for entityType: {}", entityType); } catch (SearchIndexException e) { LOG.error("Error while reading source for entityType: {}", entityType, e); synchronized (jobDataLock) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java index 2d298b1298b..67d5a62078a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java @@ -333,7 +333,7 @@ public interface EntityDAO { @Define("nameHashColumn") String nameHashColumnName, @Bind("limit") int limit); - @SqlQuery("SELECT json FROM ORDER BY name LIMIT :limit OFFSET :offset") + @SqlQuery("SELECT json FROM
ORDER BY id LIMIT :limit OFFSET :offset") List listAfter( @Define("table") String table, @BindMap Map params,