mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-26 09:22:14 +00:00
Removed Consumer (#18685)
This commit is contained in:
parent
b63ad63898
commit
b45eb5a416
@ -58,6 +58,7 @@ import java.util.concurrent.CountDownLatch;
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
@ -161,10 +162,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
@Getter private EventPublisherJob jobData;
|
@Getter private EventPublisherJob jobData;
|
||||||
private final Object jobDataLock = new Object();
|
private final Object jobDataLock = new Object();
|
||||||
private volatile boolean stopped = false;
|
private volatile boolean stopped = false;
|
||||||
private ExecutorService consumerExecutor;
|
|
||||||
private ExecutorService producerExecutor;
|
private ExecutorService producerExecutor;
|
||||||
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
|
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
|
||||||
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
|
|
||||||
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
|
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
|
||||||
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
|
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
|
||||||
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
|
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
|
||||||
@ -290,15 +289,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
int numConsumers = jobData.getConsumerThreads();
|
int numConsumers = jobData.getConsumerThreads();
|
||||||
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
|
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
|
||||||
|
|
||||||
taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
|
|
||||||
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
|
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
|
||||||
consumerExecutor =
|
|
||||||
new ThreadPoolExecutor(
|
|
||||||
numConsumers,
|
|
||||||
numConsumers,
|
|
||||||
0L,
|
|
||||||
TimeUnit.MILLISECONDS,
|
|
||||||
new LinkedBlockingQueue<>(jobData.getQueueSize()));
|
|
||||||
producerExecutor =
|
producerExecutor =
|
||||||
new ThreadPoolExecutor(
|
new ThreadPoolExecutor(
|
||||||
numProducers,
|
numProducers,
|
||||||
@ -316,25 +307,19 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
} finally {
|
} finally {
|
||||||
shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS);
|
shutdownExecutor(jobExecutor, "JobExecutor", 20, TimeUnit.SECONDS);
|
||||||
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
|
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
|
||||||
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processEntityReindex(JobExecutionContext jobExecutionContext)
|
private void processEntityReindex(JobExecutionContext jobExecutionContext)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
int numConsumers = jobData.getConsumerThreads();
|
|
||||||
int latchCount = getTotalLatchCount(jobData.getEntities());
|
int latchCount = getTotalLatchCount(jobData.getEntities());
|
||||||
CountDownLatch producerLatch = new CountDownLatch(latchCount);
|
CountDownLatch producerLatch = new CountDownLatch(latchCount);
|
||||||
CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers);
|
submitProducerTask(jobExecutionContext, producerLatch);
|
||||||
submitProducerTask(producerLatch);
|
|
||||||
submitConsumerTask(jobExecutionContext, consumerLatch);
|
|
||||||
|
|
||||||
producerLatch.await();
|
producerLatch.await();
|
||||||
sendPoisonPills(numConsumers);
|
|
||||||
consumerLatch.await();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitProducerTask(CountDownLatch producerLatch) {
|
private void submitProducerTask(
|
||||||
|
JobExecutionContext jobExecutionContext, CountDownLatch producerLatch) {
|
||||||
for (String entityType : jobData.getEntities()) {
|
for (String entityType : jobData.getEntities()) {
|
||||||
jobExecutor.submit(
|
jobExecutor.submit(
|
||||||
() -> {
|
() -> {
|
||||||
@ -342,9 +327,11 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
reCreateIndexes(entityType);
|
reCreateIndexes(entityType);
|
||||||
int totalEntityRecords = getTotalEntityRecords(entityType);
|
int totalEntityRecords = getTotalEntityRecords(entityType);
|
||||||
Source<?> source = createSource(entityType);
|
Source<?> source = createSource(entityType);
|
||||||
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
|
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
|
||||||
|
Semaphore semaphore = new Semaphore(jobData.getQueueSize());
|
||||||
if (totalEntityRecords > 0) {
|
if (totalEntityRecords > 0) {
|
||||||
for (int i = 0; i < noOfThreads; i++) {
|
for (int i = 0; i < loadPerThread; i++) {
|
||||||
|
semaphore.acquire();
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Submitting producer task current queue size: {}", producerQueue.size());
|
"Submitting producer task current queue size: {}", producerQueue.size());
|
||||||
int currentOffset = i * batchSize.get();
|
int currentOffset = i * batchSize.get();
|
||||||
@ -355,12 +342,15 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
|
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
|
||||||
currentOffset,
|
currentOffset,
|
||||||
producerLatch.getCount());
|
producerLatch.getCount());
|
||||||
processReadTask(entityType, source, currentOffset);
|
processReadTask(jobExecutionContext, entityType, source, currentOffset);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error processing entity type {}", entityType, e);
|
LOG.error("Error processing entity type {}", entityType, e);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Producer Latch Down, Current : {}", producerLatch.getCount());
|
LOG.debug(
|
||||||
|
"Producer Latch Down and Semaphore Release, Current : {}",
|
||||||
|
producerLatch.getCount());
|
||||||
producerLatch.countDown();
|
producerLatch.countDown();
|
||||||
|
semaphore.release();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -372,50 +362,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) {
|
|
||||||
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
|
|
||||||
consumerExecutor.submit(
|
|
||||||
() -> {
|
|
||||||
try {
|
|
||||||
consumeTasks(jobExecutionContext, latch);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.warn("Consumer thread interrupted.");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch)
|
|
||||||
throws InterruptedException {
|
|
||||||
while (true) {
|
|
||||||
IndexingTask<?> task = taskQueue.take();
|
|
||||||
LOG.info(
|
|
||||||
"Consuming Indexing Task for entityType: {}, entity offset : {}",
|
|
||||||
task.entityType(),
|
|
||||||
task.currentEntityOffset());
|
|
||||||
if (task == IndexingTask.POISON_PILL) {
|
|
||||||
LOG.debug("Received POISON_PILL. Consumer thread terminating.");
|
|
||||||
latch.countDown();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
processTask(task, jobExecutionContext, latch);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends POISON_PILLs to signal consumer threads to terminate.
|
|
||||||
*
|
|
||||||
* @param numConsumers The number of consumers to signal.
|
|
||||||
* @throws InterruptedException If the thread is interrupted while waiting.
|
|
||||||
*/
|
|
||||||
private void sendPoisonPills(int numConsumers) throws InterruptedException {
|
|
||||||
for (int i = 0; i < numConsumers; i++) {
|
|
||||||
taskQueue.put(IndexingTask.POISON_PILL);
|
|
||||||
}
|
|
||||||
LOG.debug("Sent {} POISON_PILLs to consumers.", numConsumers);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shuts down an executor service gracefully.
|
* Shuts down an executor service gracefully.
|
||||||
*
|
*
|
||||||
@ -592,11 +538,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
stopped = true;
|
stopped = true;
|
||||||
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
|
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
|
||||||
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
|
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
|
||||||
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTask(
|
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
|
||||||
IndexingTask<?> task, JobExecutionContext jobExecutionContext, CountDownLatch latch) {
|
|
||||||
String entityType = task.entityType();
|
String entityType = task.entityType();
|
||||||
ResultList<?> entities = task.entities();
|
ResultList<?> entities = task.entities();
|
||||||
Map<String, Object> contextData = new HashMap<>();
|
Map<String, Object> contextData = new HashMap<>();
|
||||||
@ -653,7 +597,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
|
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
|
||||||
} finally {
|
} finally {
|
||||||
sendUpdates(jobExecutionContext);
|
sendUpdates(jobExecutionContext);
|
||||||
latch.countDown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -697,20 +640,18 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
.getTotalRecords();
|
.getTotalRecords();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processReadTask(String entityType, Source<?> source, int offset) {
|
private void processReadTask(
|
||||||
|
JobExecutionContext jobExecutionContext, String entityType, Source<?> source, int offset) {
|
||||||
try {
|
try {
|
||||||
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
|
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
|
||||||
LOG.info("Read Entities with CurrentOffset: {}", offset);
|
LOG.debug("Read Entities with entityType: {}, CurrentOffset: {}", entityType, offset);
|
||||||
if (resultList != null) {
|
if (resultList != null) {
|
||||||
ResultList<?> entities = extractEntities(entityType, resultList);
|
ResultList<?> entities = extractEntities(entityType, resultList);
|
||||||
if (!nullOrEmpty(entities.getData())) {
|
if (!nullOrEmpty(entities.getData())) {
|
||||||
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
|
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
|
||||||
taskQueue.put(task);
|
processTask(task, jobExecutionContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.warn("Reader thread interrupted for entityType: {}", entityType);
|
|
||||||
} catch (SearchIndexException e) {
|
} catch (SearchIndexException e) {
|
||||||
LOG.error("Error while reading source for entityType: {}", entityType, e);
|
LOG.error("Error while reading source for entityType: {}", entityType, e);
|
||||||
synchronized (jobDataLock) {
|
synchronized (jobDataLock) {
|
||||||
|
@ -333,7 +333,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
|||||||
@Define("nameHashColumn") String nameHashColumnName,
|
@Define("nameHashColumn") String nameHashColumnName,
|
||||||
@Bind("limit") int limit);
|
@Bind("limit") int limit);
|
||||||
|
|
||||||
@SqlQuery("SELECT json FROM <table> <cond> ORDER BY name LIMIT :limit OFFSET :offset")
|
@SqlQuery("SELECT json FROM <table> <cond> ORDER BY id LIMIT :limit OFFSET :offset")
|
||||||
List<String> listAfter(
|
List<String> listAfter(
|
||||||
@Define("table") String table,
|
@Define("table") String table,
|
||||||
@BindMap Map<String, ?> params,
|
@BindMap Map<String, ?> params,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user