diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 8ab4472ac0d..4d476e71f6e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -56,8 +56,8 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; @@ -161,7 +161,7 @@ public class SearchIndexApp extends AbstractNativeApplication { private volatile boolean stopped = false; private ExecutorService consumerExecutor; private ExecutorService producerExecutor; - private final BlockingQueue> taskQueue = new LinkedBlockingQueue<>(); + private BlockingQueue> taskQueue = new LinkedBlockingQueue<>(100); private final AtomicReference searchIndexStats = new AtomicReference<>(); private final AtomicReference batchSize = new AtomicReference<>(5); @@ -286,8 +286,21 @@ public class SearchIndexApp extends AbstractNativeApplication { int numConsumers = jobData.getConsumerThreads(); LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); - consumerExecutor = Executors.newFixedThreadPool(numConsumers); - producerExecutor = Executors.newFixedThreadPool(numProducers); + taskQueue = new LinkedBlockingQueue<>(jobData.getQueueSize()); + consumerExecutor = + new ThreadPoolExecutor( + numConsumers, + numConsumers, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(jobData.getQueueSize())); + producerExecutor = + new ThreadPoolExecutor( + numProducers, + numProducers, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(jobData.getQueueSize())); try { processEntityReindex(jobExecutionContext); diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index 06d5e7cb68a..015d8f6ede2 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -50,6 +50,7 @@ "payLoadSize": 104857600, "producerThreads": 1, "consumerThreads": 1, + "queueSize": 100, "maxConcurrentRequests": 100, "maxRetries": 3, "initialBackoff": 1000, diff --git a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json index b782599401e..7ec55770148 100644 --- a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json @@ -64,6 +64,7 @@ "batchSize": "100", "payLoadSize": 104857600, "producerThreads": 1, + "queueSize": 100, "consumerThreads": 1, "maxConcurrentRequests": 100, "maxRetries": 3, diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index 4f69fd38128..069ddbb6243 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -120,6 +120,12 @@ "type": "integer", "default": 1 }, + "queueSize": { + "title": "Queue Size to use.", + "description": "Queue Size to user internally for reindexing.", + "type": "integer", + "default": 100 + }, "maxConcurrentRequests": { "title": "Max Concurrent Requests", "description": "Maximum number of concurrent requests to the search index", diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json index d63f57bb188..9c3989ce0fb 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json @@ -28,6 +28,12 @@ "type": "integer", "default": 1 }, + "queueSize": { + "title": "Queue Size to use.", + "description": "Queue Size to user internally for reindexing.", + "type": "integer", + "default": 100 + }, "maxConcurrentRequests": { "title": "Max Concurrent Requests", "description": "Maximum number of concurrent requests to the search index",