mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-14 17:17:15 +00:00
Fix Unlimited Queue Size Issue (#18633)
This commit is contained in:
parent
128e4e8e93
commit
ee327ce0c4
@ -56,8 +56,8 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
@ -161,7 +161,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
private volatile boolean stopped = false;
|
private volatile boolean stopped = false;
|
||||||
private ExecutorService consumerExecutor;
|
private ExecutorService consumerExecutor;
|
||||||
private ExecutorService producerExecutor;
|
private ExecutorService producerExecutor;
|
||||||
private final BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>();
|
private BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>(100);
|
||||||
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
|
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
|
||||||
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
|
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
|
||||||
|
|
||||||
@ -286,8 +286,21 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
int numConsumers = jobData.getConsumerThreads();
|
int numConsumers = jobData.getConsumerThreads();
|
||||||
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
|
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
|
||||||
|
|
||||||
consumerExecutor = Executors.newFixedThreadPool(numConsumers);
|
taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
|
||||||
producerExecutor = Executors.newFixedThreadPool(numProducers);
|
consumerExecutor =
|
||||||
|
new ThreadPoolExecutor(
|
||||||
|
numConsumers,
|
||||||
|
numConsumers,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new LinkedBlockingQueue<>(jobData.getQueueSize()));
|
||||||
|
producerExecutor =
|
||||||
|
new ThreadPoolExecutor(
|
||||||
|
numProducers,
|
||||||
|
numProducers,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new LinkedBlockingQueue<>(jobData.getQueueSize()));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
processEntityReindex(jobExecutionContext);
|
processEntityReindex(jobExecutionContext);
|
||||||
|
|||||||
@ -50,6 +50,7 @@
|
|||||||
"payLoadSize": 104857600,
|
"payLoadSize": 104857600,
|
||||||
"producerThreads": 1,
|
"producerThreads": 1,
|
||||||
"consumerThreads": 1,
|
"consumerThreads": 1,
|
||||||
|
"queueSize": 100,
|
||||||
"maxConcurrentRequests": 100,
|
"maxConcurrentRequests": 100,
|
||||||
"maxRetries": 3,
|
"maxRetries": 3,
|
||||||
"initialBackoff": 1000,
|
"initialBackoff": 1000,
|
||||||
|
|||||||
@ -64,6 +64,7 @@
|
|||||||
"batchSize": "100",
|
"batchSize": "100",
|
||||||
"payLoadSize": 104857600,
|
"payLoadSize": 104857600,
|
||||||
"producerThreads": 1,
|
"producerThreads": 1,
|
||||||
|
"queueSize": 100,
|
||||||
"consumerThreads": 1,
|
"consumerThreads": 1,
|
||||||
"maxConcurrentRequests": 100,
|
"maxConcurrentRequests": 100,
|
||||||
"maxRetries": 3,
|
"maxRetries": 3,
|
||||||
|
|||||||
@ -120,6 +120,12 @@
|
|||||||
"type": "integer",
|
"type": "integer",
|
||||||
"default": 1
|
"default": 1
|
||||||
},
|
},
|
||||||
|
"queueSize": {
|
||||||
|
"title": "Queue Size to use.",
|
||||||
|
"description": "Queue Size to user internally for reindexing.",
|
||||||
|
"type": "integer",
|
||||||
|
"default": 100
|
||||||
|
},
|
||||||
"maxConcurrentRequests": {
|
"maxConcurrentRequests": {
|
||||||
"title": "Max Concurrent Requests",
|
"title": "Max Concurrent Requests",
|
||||||
"description": "Maximum number of concurrent requests to the search index",
|
"description": "Maximum number of concurrent requests to the search index",
|
||||||
|
|||||||
@ -28,6 +28,12 @@
|
|||||||
"type": "integer",
|
"type": "integer",
|
||||||
"default": 1
|
"default": 1
|
||||||
},
|
},
|
||||||
|
"queueSize": {
|
||||||
|
"title": "Queue Size to use.",
|
||||||
|
"description": "Queue Size to user internally for reindexing.",
|
||||||
|
"type": "integer",
|
||||||
|
"default": 100
|
||||||
|
},
|
||||||
"maxConcurrentRequests": {
|
"maxConcurrentRequests": {
|
||||||
"title": "Max Concurrent Requests",
|
"title": "Max Concurrent Requests",
|
||||||
"description": "Maximum number of concurrent requests to the search index",
|
"description": "Maximum number of concurrent requests to the search index",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user