diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index bd431d7f1b6..612d99e3042 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -100,6 +100,7 @@ import org.quartz.JobExecutionContext; public class SearchIndexApp extends AbstractNativeApplication { private static final String ALL = "all"; + public static final Set ALL_ENTITIES = Set.of( TABLE, @@ -162,8 +163,9 @@ public class SearchIndexApp extends AbstractNativeApplication { private volatile boolean stopped = false; private ExecutorService consumerExecutor; private ExecutorService producerExecutor; - private ExecutorService jobExecutor = Executors.newFixedThreadPool(2); + private final ExecutorService jobExecutor = Executors.newCachedThreadPool(); private BlockingQueue> taskQueue = new LinkedBlockingQueue<>(100); + private BlockingQueue producerQueue = new LinkedBlockingQueue<>(100); private final AtomicReference searchIndexStats = new AtomicReference<>(); private final AtomicReference batchSize = new AtomicReference<>(5); @@ -289,6 +291,7 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); + producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); consumerExecutor = new ThreadPoolExecutor( numConsumers, @@ -302,7 +305,7 @@ public class SearchIndexApp extends AbstractNativeApplication { numProducers, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(jobData.getQueueSize()), + producerQueue, new ThreadPoolExecutor.CallerRunsPolicy()); try { @@ -321,8 +324,8 @@ public class SearchIndexApp extends AbstractNativeApplication { throws InterruptedException { int numConsumers = jobData.getConsumerThreads(); CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); - jobExecutor.submit(() -> submitProducerTask(producerLatch)); - jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext)); + submitProducerTask(producerLatch); + submitConsumerTask(jobExecutionContext); producerLatch.await(); sendPoisonPills(numConsumers); @@ -330,29 +333,39 @@ public class SearchIndexApp extends AbstractNativeApplication { private void submitProducerTask(CountDownLatch producerLatch) { for (String entityType : jobData.getEntities()) { - try { - reCreateIndexes(entityType); - int totalEntityRecords = getTotalEntityRecords(entityType); - Source source = createSource(entityType); - int noOfThreads = calculateNumberOfThreads(totalEntityRecords); - if (totalEntityRecords > 0) { - for (int i = 0; i < noOfThreads; i++) { - int currentOffset = i * batchSize.get(); - producerExecutor.submit( - () -> { - try { - processReadTask(entityType, source, currentOffset); - } catch (Exception e) { - LOG.error("Error processing entity type {}", entityType, e); - } finally { - producerLatch.countDown(); - } - }); - } - } - } catch (Exception e) { - LOG.error("Error processing entity type {}", entityType, e); - } + jobExecutor.submit( + () -> { + try { + reCreateIndexes(entityType); + int totalEntityRecords = getTotalEntityRecords(entityType); + Source source = createSource(entityType); + int noOfThreads = calculateNumberOfThreads(totalEntityRecords); + if (totalEntityRecords > 0) { + for (int i = 0; i < noOfThreads; i++) { + LOG.debug( + "Submitting producer task current queue size: {}", producerQueue.size()); + int currentOffset = i * batchSize.get(); + producerExecutor.submit( + () -> { + try { + LOG.debug( + "Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}", + currentOffset, + producerLatch.getCount()); + processReadTask(entityType, source, currentOffset); + } catch (Exception e) { + LOG.error("Error processing entity type {}", entityType, e); + } finally { + LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount()); + producerLatch.countDown(); + } + }); + } + } + } catch (Exception e) { + LOG.error("Error processing entity type {}", entityType, e); + } + }); } } @@ -572,6 +585,7 @@ public class SearchIndexApp extends AbstractNativeApplication { public void stopJob() { LOG.info("Stopping reindexing job."); stopped = true; + shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); } @@ -679,12 +693,12 @@ public class SearchIndexApp extends AbstractNativeApplication { private void processReadTask(String entityType, Source source, int offset) { try { Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset))); + LOG.info("Read Entities with CurrentOffset: {}", offset); if (resultList != null) { ResultList entities = extractEntities(entityType, resultList); if (!nullOrEmpty(entities.getData())) { - LOG.info( - "Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset); - createIndexingTask(entityType, entities, offset); + IndexingTask task = new IndexingTask<>(entityType, entities, offset); + taskQueue.put(task); } } } catch (InterruptedException e) { @@ -708,13 +722,7 @@ public class SearchIndexApp extends AbstractNativeApplication { } } - private void createIndexingTask(String entityType, ResultList entities, int offset) - throws InterruptedException { - IndexingTask task = new IndexingTask<>(entityType, entities, offset); - taskQueue.put(task); - } - - private synchronized int calculateNumberOfThreads(int totalEntityRecords) { + private int calculateNumberOfThreads(int totalEntityRecords) { int mod = totalEntityRecords % batchSize.get(); if (mod == 0) { return totalEntityRecords / batchSize.get(); @@ -732,7 +740,7 @@ public class SearchIndexApp extends AbstractNativeApplication { } } - private synchronized int getRemainingRecordsToProcess(String entityType) { + private int getRemainingRecordsToProcess(String entityType) { StepStats entityStats = ((StepStats) searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType));