Search Index Issues (#18648)

* Search Index Issues

* Log Thread

* Make logs debug

* Remove Current Therad Name
This commit is contained in:
Mohit Yadav 2024-11-14 22:19:35 +05:30 committed by GitHub
parent 75d417d267
commit 560e28df76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -100,6 +100,7 @@ import org.quartz.JobExecutionContext;
public class SearchIndexApp extends AbstractNativeApplication { public class SearchIndexApp extends AbstractNativeApplication {
private static final String ALL = "all"; private static final String ALL = "all";
public static final Set<String> ALL_ENTITIES = public static final Set<String> ALL_ENTITIES =
Set.of( Set.of(
TABLE, TABLE,
@ -162,8 +163,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
private volatile boolean stopped = false; private volatile boolean stopped = false;
private ExecutorService consumerExecutor; private ExecutorService consumerExecutor;
private ExecutorService producerExecutor; private ExecutorService producerExecutor;
private ExecutorService jobExecutor = Executors.newFixedThreadPool(2); private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100); private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>(); private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5); private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
@ -289,6 +291,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
consumerExecutor = consumerExecutor =
new ThreadPoolExecutor( new ThreadPoolExecutor(
numConsumers, numConsumers,
@ -302,7 +305,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
numProducers, numProducers,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(jobData.getQueueSize()), producerQueue,
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadPoolExecutor.CallerRunsPolicy());
try { try {
@ -321,8 +324,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
throws InterruptedException { throws InterruptedException {
int numConsumers = jobData.getConsumerThreads(); int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
jobExecutor.submit(() -> submitProducerTask(producerLatch)); submitProducerTask(producerLatch);
jobExecutor.submit(() -> submitConsumerTask(jobExecutionContext)); submitConsumerTask(jobExecutionContext);
producerLatch.await(); producerLatch.await();
sendPoisonPills(numConsumers); sendPoisonPills(numConsumers);
@ -330,29 +333,39 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void submitProducerTask(CountDownLatch producerLatch) { private void submitProducerTask(CountDownLatch producerLatch) {
for (String entityType : jobData.getEntities()) { for (String entityType : jobData.getEntities()) {
try { jobExecutor.submit(
reCreateIndexes(entityType); () -> {
int totalEntityRecords = getTotalEntityRecords(entityType); try {
Source<?> source = createSource(entityType); reCreateIndexes(entityType);
int noOfThreads = calculateNumberOfThreads(totalEntityRecords); int totalEntityRecords = getTotalEntityRecords(entityType);
if (totalEntityRecords > 0) { Source<?> source = createSource(entityType);
for (int i = 0; i < noOfThreads; i++) { int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
int currentOffset = i * batchSize.get(); if (totalEntityRecords > 0) {
producerExecutor.submit( for (int i = 0; i < noOfThreads; i++) {
() -> { LOG.debug(
try { "Submitting producer task current queue size: {}", producerQueue.size());
processReadTask(entityType, source, currentOffset); int currentOffset = i * batchSize.get();
} catch (Exception e) { producerExecutor.submit(
LOG.error("Error processing entity type {}", entityType, e); () -> {
} finally { try {
producerLatch.countDown(); LOG.debug(
} "Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
}); currentOffset,
} producerLatch.getCount());
} processReadTask(entityType, source, currentOffset);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e); LOG.error("Error processing entity type {}", entityType, e);
} } finally {
LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount());
producerLatch.countDown();
}
});
}
}
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
}
});
} }
} }
@ -572,6 +585,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
public void stopJob() { public void stopJob() {
LOG.info("Stopping reindexing job."); LOG.info("Stopping reindexing job.");
stopped = true; stopped = true;
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS); shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
} }
@ -679,12 +693,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void processReadTask(String entityType, Source<?> source, int offset) { private void processReadTask(String entityType, Source<?> source, int offset) {
try { try {
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset))); Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
LOG.info("Read Entities with CurrentOffset: {}", offset);
if (resultList != null) { if (resultList != null) {
ResultList<?> entities = extractEntities(entityType, resultList); ResultList<?> entities = extractEntities(entityType, resultList);
if (!nullOrEmpty(entities.getData())) { if (!nullOrEmpty(entities.getData())) {
LOG.info( IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
"Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset); taskQueue.put(task);
createIndexingTask(entityType, entities, offset);
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -708,13 +722,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private void createIndexingTask(String entityType, ResultList<?> entities, int offset) private int calculateNumberOfThreads(int totalEntityRecords) {
throws InterruptedException {
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
taskQueue.put(task);
}
private synchronized int calculateNumberOfThreads(int totalEntityRecords) {
int mod = totalEntityRecords % batchSize.get(); int mod = totalEntityRecords % batchSize.get();
if (mod == 0) { if (mod == 0) {
return totalEntityRecords / batchSize.get(); return totalEntityRecords / batchSize.get();
@ -732,7 +740,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private synchronized int getRemainingRecordsToProcess(String entityType) { private int getRemainingRecordsToProcess(String entityType) {
StepStats entityStats = StepStats entityStats =
((StepStats) ((StepStats)
searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType)); searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType));