mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-22 16:08:13 +00:00
Add Latch for consumers (#18655)
(cherry picked from commit 8c6de372f1fe68ed3649476750752fc65632ee9f)
This commit is contained in:
parent
64a29d1435
commit
67c3ed75c6
@ -322,12 +322,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
private void processEntityReindex(JobExecutionContext jobExecutionContext)
|
private void processEntityReindex(JobExecutionContext jobExecutionContext)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
int numConsumers = jobData.getConsumerThreads();
|
int numConsumers = jobData.getConsumerThreads();
|
||||||
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
|
int latchCount = getTotalLatchCount(jobData.getEntities());
|
||||||
|
CountDownLatch producerLatch = new CountDownLatch(latchCount);
|
||||||
|
CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers);
|
||||||
submitProducerTask(producerLatch);
|
submitProducerTask(producerLatch);
|
||||||
submitConsumerTask(jobExecutionContext);
|
submitConsumerTask(jobExecutionContext, consumerLatch);
|
||||||
|
|
||||||
producerLatch.await();
|
producerLatch.await();
|
||||||
sendPoisonPills(numConsumers);
|
sendPoisonPills(numConsumers);
|
||||||
|
consumerLatch.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitProducerTask(CountDownLatch producerLatch) {
|
private void submitProducerTask(CountDownLatch producerLatch) {
|
||||||
@ -368,12 +371,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitConsumerTask(JobExecutionContext jobExecutionContext) {
|
private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) {
|
||||||
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
|
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
|
||||||
consumerExecutor.submit(
|
consumerExecutor.submit(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
try {
|
||||||
consumeTasks(jobExecutionContext);
|
consumeTasks(jobExecutionContext, latch);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
LOG.warn("Consumer thread interrupted.");
|
LOG.warn("Consumer thread interrupted.");
|
||||||
@ -382,7 +385,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {
|
private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch)
|
||||||
|
throws InterruptedException {
|
||||||
while (true) {
|
while (true) {
|
||||||
IndexingTask<?> task = taskQueue.take();
|
IndexingTask<?> task = taskQueue.take();
|
||||||
LOG.info(
|
LOG.info(
|
||||||
@ -391,9 +395,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
task.currentEntityOffset());
|
task.currentEntityOffset());
|
||||||
if (task == IndexingTask.POISON_PILL) {
|
if (task == IndexingTask.POISON_PILL) {
|
||||||
LOG.debug("Received POISON_PILL. Consumer thread terminating.");
|
LOG.debug("Received POISON_PILL. Consumer thread terminating.");
|
||||||
|
latch.countDown();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
processTask(task, jobExecutionContext);
|
processTask(task, jobExecutionContext, latch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -589,7 +594,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
|
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
|
private void processTask(
|
||||||
|
IndexingTask<?> task, JobExecutionContext jobExecutionContext, CountDownLatch latch) {
|
||||||
String entityType = task.entityType();
|
String entityType = task.entityType();
|
||||||
ResultList<?> entities = task.entities();
|
ResultList<?> entities = task.entities();
|
||||||
Map<String, Object> contextData = new HashMap<>();
|
Map<String, Object> contextData = new HashMap<>();
|
||||||
@ -632,7 +638,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
synchronized (jobDataLock) {
|
synchronized (jobDataLock) {
|
||||||
jobData.setStatus(EventPublisherJob.Status.FAILED);
|
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
|
||||||
jobData.setFailure(
|
jobData.setFailure(
|
||||||
new IndexingError()
|
new IndexingError()
|
||||||
.withErrorSource(IndexingError.ErrorSource.JOB)
|
.withErrorSource(IndexingError.ErrorSource.JOB)
|
||||||
@ -646,6 +652,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
|
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
|
||||||
} finally {
|
} finally {
|
||||||
sendUpdates(jobExecutionContext);
|
sendUpdates(jobExecutionContext);
|
||||||
|
latch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user