Fix #21668: Add Auto Tune to conifgure Search Index Application Parameters (#21669)

* Fix #21668: Add Auto Tune to conifgure Search Index Application Parameters

* add auto tune param in app json

* add test

* use virtual threads

* Fix Failing Test

---------

Co-authored-by: karanh37 <karanh37@gmail.com>
Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com>
This commit is contained in:
Sriharsha Chintalapani 2025-06-13 09:23:22 -07:00 committed by GitHub
parent 44e09e41a2
commit 7d07222635
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 753 additions and 56 deletions

View File

@ -943,6 +943,11 @@
<artifactId>org.apache.felix.http.servlet-api</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
@ -44,6 +45,8 @@ public class ElasticSearchIndexSink implements BulkSink, Closeable {
private final long initialBackoffMillis;
private final long maxBackoffMillis;
private final Semaphore semaphore;
private static final RequestOptions COMPRESSED_REQUEST_OPTIONS =
RequestOptions.DEFAULT.toBuilder().addHeader(HttpHeaders.CONTENT_ENCODING, "gzip").build();
public ElasticSearchIndexSink(
SearchClient client,
@ -144,7 +147,7 @@ public class ElasticSearchIndexSink implements BulkSink, Closeable {
((RestHighLevelClient) client.getClient())
.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
COMPRESSED_REQUEST_OPTIONS,
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
@ -223,7 +226,7 @@ public class ElasticSearchIndexSink implements BulkSink, Closeable {
try {
BulkResponse response =
((RestHighLevelClient) client.getClient()).bulk(bulkRequest, RequestOptions.DEFAULT);
((RestHighLevelClient) client.getClient()).bulk(bulkRequest, COMPRESSED_REQUEST_OPTIONS);
// Collect all bulk item failures
for (int i = 0; i < response.getItems().length; i++) {

View File

@ -12,6 +12,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
@ -45,6 +46,8 @@ public class OpenSearchIndexSink implements BulkSink, Closeable {
private final long initialBackoffMillis;
private final long maxBackoffMillis;
private final Semaphore semaphore;
private static final RequestOptions COMPRESSED_REQUEST_OPTIONS =
RequestOptions.DEFAULT.toBuilder().addHeader(HttpHeaders.CONTENT_ENCODING, "gzip").build();
public OpenSearchIndexSink(
SearchClient client,
@ -145,7 +148,7 @@ public class OpenSearchIndexSink implements BulkSink, Closeable {
((RestHighLevelClient) client.getClient())
.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
COMPRESSED_REQUEST_OPTIONS,
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.QUERY_COST_RECORD;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
@ -16,13 +17,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
@ -50,6 +48,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.socket.WebSocketManager;
@ -83,8 +82,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
@Getter private EventPublisherJob jobData;
private final Object jobDataLock = new Object();
private ExecutorService producerExecutor;
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
private final ExecutorService jobExecutor = Executors.newVirtualThreadPerTaskExecutor();
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
private JobExecutionContext jobExecutionContext;
@ -162,12 +160,20 @@ public class SearchIndexApp extends AbstractNativeApplication {
cleanUpStaleJobsFromRuns();
LOG.info("Executing Reindexing Job with JobData: {}", jobData);
batchSize.set(jobData.getBatchSize());
jobData.setStatus(EventPublisherJob.Status.RUNNING);
LOG.debug("Initializing job statistics.");
searchIndexStats.set(initializeTotalRecords(jobData.getEntities()));
jobData.setStats(searchIndexStats.get());
// Apply auto-tuning if enabled (after stats are initialized)
if (Boolean.TRUE.equals(jobData.getAutoTune())) {
LOG.info("Auto-tune enabled, analyzing cluster and adjusting parameters...");
applyAutoTuning();
}
batchSize.set(jobData.getBatchSize());
sendUpdates(jobExecutionContext, true);
ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType();
@ -231,17 +237,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
int numProducers = jobData.getProducerThreads();
int numConsumers = jobData.getConsumerThreads();
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
producerQueue = new LinkedBlockingQueue<>(jobData.getQueueSize());
producerExecutor =
new ThreadPoolExecutor(
numProducers,
numProducers,
0L,
TimeUnit.MILLISECONDS,
producerQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
producerExecutor = Executors.newVirtualThreadPerTaskExecutor();
try {
processEntityReindex(jobExecutionContext);
} catch (Exception e) {
@ -270,27 +266,30 @@ public class SearchIndexApp extends AbstractNativeApplication {
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
Semaphore semaphore = new Semaphore(jobData.getQueueSize());
Semaphore semaphore = new Semaphore(Math.max(jobData.getQueueSize(), 100));
if (totalEntityRecords > 0) {
for (int i = 0; i < loadPerThread; i++) {
semaphore.acquire();
LOG.debug(
"Submitting producer task current queue size: {}", producerQueue.size());
"Submitting virtual thread producer task for batch {}/{}",
i + 1,
loadPerThread);
int currentOffset = i * batchSize.get();
producerExecutor.submit(
() -> {
try {
LOG.debug(
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
"Virtual thread processing offset: {}, remaining batches: {}",
currentOffset,
producerLatch.getCount());
processReadTask(jobExecutionContext, entityType, source, currentOffset);
} catch (Exception e) {
LOG.error("Error processing entity type {}", entityType, e);
LOG.error(
"Error processing entity type {} with virtual thread", entityType, e);
} finally {
LOG.debug(
"Producer Latch Down and Semaphore Release, Current : {}",
producerLatch.getCount());
"Virtual thread completed batch, remaining: {}",
producerLatch.getCount() - 1);
producerLatch.countDown();
semaphore.release();
}
@ -536,8 +535,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
// After successful write, create a new StepStats for the current batch
StepStats currentEntityStats = new StepStats();
currentEntityStats.setSuccessRecords(entities.getData().size());
currentEntityStats.setFailedRecords(entities.getErrors().size());
currentEntityStats.setSuccessRecords(listOrEmpty(entities.getData()).size());
currentEntityStats.setFailedRecords(listOrEmpty(entities.getErrors()).size());
// Do NOT set Total Records here
// Update statistics in a thread-safe manner
@ -711,6 +710,42 @@ public class SearchIndexApp extends AbstractNativeApplication {
- entityStats.getSuccessRecords();
}
private void applyAutoTuning() {
try {
ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType();
LOG.info("Auto-tune: Request compression enabled for {} bulk operations", searchType);
LOG.info("Auto-tune: JSON payloads will be gzip compressed (~75% size reduction)");
long totalEntities = searchIndexStats.get().getJobStats().getTotalRecords();
SearchClusterMetrics clusterMetrics =
SearchClusterMetrics.fetchClusterMetrics(searchRepository, totalEntities);
clusterMetrics.logRecommendations();
LOG.info("Applying auto-tuned parameters...");
LOG.info(
"Original - Batch Size: {}, Producer Threads: {}, Concurrent Requests: {}, Payload Size: {} MB",
jobData.getBatchSize(),
jobData.getProducerThreads(),
jobData.getMaxConcurrentRequests(),
jobData.getPayLoadSize() / (1024 * 1024));
jobData.setBatchSize(clusterMetrics.getRecommendedBatchSize());
jobData.setProducerThreads(clusterMetrics.getRecommendedProducerThreads());
jobData.setMaxConcurrentRequests(clusterMetrics.getRecommendedConcurrentRequests());
jobData.setPayLoadSize(clusterMetrics.getMaxPayloadSizeBytes());
LOG.info(
"Auto-tuned - Batch Size: {}, Producer Threads: {}, Concurrent Requests: {}, Payload Size: {} MB",
jobData.getBatchSize(),
jobData.getProducerThreads(),
jobData.getMaxConcurrentRequests(),
jobData.getPayLoadSize() / (1024 * 1024));
} catch (Exception e) {
LOG.warn("Auto-tuning failed, using original parameters: {}", e.getMessage());
LOG.debug("Auto-tuning error details", e);
}
}
static record IndexingTask<T>(
String entityType, ResultList<T> entities, int currentEntityOffset) {
public static final IndexingTask<?> POISON_PILL =

View File

@ -0,0 +1,294 @@
package org.openmetadata.service.search;
import java.util.Map;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
import org.openmetadata.service.search.opensearch.OpenSearchClient;
@Slf4j
@Builder
@Getter
public class SearchClusterMetrics {
private final int availableProcessors;
private final long heapSizeBytes;
private final long availableMemoryBytes;
private final int totalShards;
private final int totalNodes;
private final double cpuUsagePercent;
private final double memoryUsagePercent;
private final long maxPayloadSizeBytes;
private final int recommendedConcurrentRequests;
private final int recommendedBatchSize;
private final int recommendedProducerThreads;
public static SearchClusterMetrics fetchClusterMetrics(
SearchRepository searchRepository, long totalEntities) {
ElasticSearchConfiguration.SearchType searchType = searchRepository.getSearchType();
try {
if (searchType.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
return fetchOpenSearchMetrics(
(OpenSearchClient) searchRepository.getSearchClient(), totalEntities);
} else {
return fetchElasticSearchMetrics(
(ElasticSearchClient) searchRepository.getSearchClient(), totalEntities);
}
} catch (Exception e) {
LOG.warn("Failed to fetch cluster metrics, using conservative defaults: {}", e.getMessage());
return getConservativeDefaults(totalEntities);
}
}
@SuppressWarnings("unchecked")
private static SearchClusterMetrics fetchOpenSearchMetrics(
OpenSearchClient osClient, long totalEntities) {
try {
Map<String, Object> clusterStats = osClient.clusterStats();
Map<String, Object> nodesStats = osClient.nodesStats();
Map<String, Object> clusterSettings = osClient.clusterSettings();
Map<String, Object> nodes = (Map<String, Object>) clusterStats.get("nodes");
int totalNodes = (Integer) nodes.get("count");
Map<String, Object> indices = (Map<String, Object>) clusterStats.get("indices");
Map<String, Object> shards = (Map<String, Object>) indices.get("shards");
int totalShards = (Integer) shards.get("total");
Map<String, Object> nodesMap = (Map<String, Object>) nodesStats.get("nodes");
Map<String, Object> firstNode = (Map<String, Object>) nodesMap.values().iterator().next();
Map<String, Object> os = (Map<String, Object>) firstNode.get("os");
Map<String, Object> cpu = (Map<String, Object>) os.get("cpu");
double cpuUsagePercent = ((Number) cpu.get("percent")).doubleValue();
Map<String, Object> jvm = (Map<String, Object>) firstNode.get("jvm");
Map<String, Object> mem = (Map<String, Object>) jvm.get("mem");
long heapUsedBytes = ((Number) mem.get("heap_used_in_bytes")).longValue();
long heapMaxBytes = ((Number) mem.get("heap_max_in_bytes")).longValue();
double memoryUsagePercent = (double) heapUsedBytes / heapMaxBytes * 100;
long maxContentLength = extractMaxContentLength(clusterSettings);
return calculateRecommendations(
totalNodes,
totalShards,
cpuUsagePercent,
memoryUsagePercent,
heapMaxBytes,
maxContentLength,
totalEntities);
} catch (Exception e) {
LOG.warn("Failed to fetch OpenSearch cluster metrics: {}", e.getMessage());
return getConservativeDefaults(totalEntities);
}
}
@SuppressWarnings("unchecked")
private static SearchClusterMetrics fetchElasticSearchMetrics(
ElasticSearchClient client, long totalEntities) {
try {
Map<String, Object> clusterStats = client.clusterStats();
Map<String, Object> nodesStats = client.nodesStats();
Map<String, Object> clusterSettings = client.clusterSettings();
Map<String, Object> nodes = (Map<String, Object>) clusterStats.get("nodes");
int totalNodes = (Integer) nodes.get("count");
Map<String, Object> indices = (Map<String, Object>) clusterStats.get("indices");
int totalShards = (Integer) indices.get("shards");
Map<String, Object> nodesMap = (Map<String, Object>) nodesStats.get("nodes");
Map<String, Object> firstNode = (Map<String, Object>) nodesMap.values().iterator().next();
Map<String, Object> os = (Map<String, Object>) firstNode.get("os");
Map<String, Object> cpu = (Map<String, Object>) os.get("cpu");
double cpuUsagePercent = ((Number) cpu.get("percent")).doubleValue();
Map<String, Object> jvm = (Map<String, Object>) firstNode.get("jvm");
Map<String, Object> mem = (Map<String, Object>) jvm.get("mem");
long heapUsedBytes = ((Number) mem.get("heap_used_in_bytes")).longValue();
long heapMaxBytes = ((Number) mem.get("heap_max_in_bytes")).longValue();
double memoryUsagePercent = (double) heapUsedBytes / heapMaxBytes * 100;
long maxContentLength = extractMaxContentLength(clusterSettings);
return calculateRecommendations(
totalNodes,
totalShards,
cpuUsagePercent,
memoryUsagePercent,
heapMaxBytes,
maxContentLength,
totalEntities);
} catch (Exception e) {
LOG.warn("Failed to fetch ElasticSearch cluster metrics: {}", e.getMessage());
return getConservativeDefaults(totalEntities);
}
}
private static SearchClusterMetrics calculateRecommendations(
int totalNodes,
int totalShards,
double cpuUsagePercent,
double memoryUsagePercent,
long heapMaxBytes,
long maxContentLength,
long totalEntities) {
int baseThreadsPerNode = Runtime.getRuntime().availableProcessors() * 4;
int recommendedProducerThreads = Math.min(100, baseThreadsPerNode * totalNodes);
if (cpuUsagePercent > 80) {
recommendedProducerThreads = Math.max(10, recommendedProducerThreads / 2);
} else if (cpuUsagePercent < 40) {
recommendedProducerThreads = Math.min(200, recommendedProducerThreads * 3);
}
int baseConcurrentRequests = totalNodes * 50;
if (memoryUsagePercent > 80) {
baseConcurrentRequests = Math.max(10, baseConcurrentRequests / 2);
} else if (memoryUsagePercent < 50) {
baseConcurrentRequests = Math.min(500, baseConcurrentRequests * 2);
}
long heapBasedPayloadSize =
Math.min(500 * 1024 * 1024L, heapMaxBytes / 20); // Max 500MB or 5% of heap
double compressionRatio = 0.25; // Conservative estimate: compressed size is 25% of original
long effectiveMaxContentLength = (long) (maxContentLength / compressionRatio);
long maxPayloadSize =
Math.min(heapBasedPayloadSize, effectiveMaxContentLength * 8 / 10); // Use 80% for safety
int avgEntitySizeKB = 2; // Assume 2KB average entity size (uncompressed)
int recommendedBatchSize = (int) Math.min(2000, maxPayloadSize / (avgEntitySizeKB * 1024L));
recommendedBatchSize = Math.max(100, recommendedBatchSize);
if (totalEntities > 1000000) {
recommendedBatchSize = Math.max(500, recommendedBatchSize);
recommendedProducerThreads =
Math.min(50, recommendedProducerThreads); // Increased from 10 to 50
}
return SearchClusterMetrics.builder()
.availableProcessors(Runtime.getRuntime().availableProcessors())
.heapSizeBytes(heapMaxBytes)
.availableMemoryBytes(heapMaxBytes - (long) (heapMaxBytes * memoryUsagePercent / 100))
.totalShards(totalShards)
.totalNodes(totalNodes)
.cpuUsagePercent(cpuUsagePercent)
.memoryUsagePercent(memoryUsagePercent)
.maxPayloadSizeBytes(maxPayloadSize)
.recommendedConcurrentRequests(baseConcurrentRequests)
.recommendedBatchSize(recommendedBatchSize)
.recommendedProducerThreads(recommendedProducerThreads)
.build();
}
@SuppressWarnings("unchecked")
private static long extractMaxContentLength(Map<String, Object> clusterSettings) {
try {
long defaultMaxContentLength = 100 * 1024 * 1024L; // 100MB
Map<String, Object> persistentSettings =
(Map<String, Object>) clusterSettings.get("persistent");
Map<String, Object> transientSettings =
(Map<String, Object>) clusterSettings.get("transient");
String maxContentLengthStr = null;
if (persistentSettings != null && persistentSettings.containsKey("http.max_content_length")) {
maxContentLengthStr = (String) persistentSettings.get("http.max_content_length");
}
if (maxContentLengthStr == null
&& transientSettings != null
&& transientSettings.containsKey("http.max_content_length")) {
maxContentLengthStr = (String) transientSettings.get("http.max_content_length");
}
if (maxContentLengthStr != null) {
return parseByteSize(maxContentLengthStr);
}
return defaultMaxContentLength;
} catch (Exception e) {
LOG.warn("Failed to extract maxContentLength from cluster settings: {}", e.getMessage());
return 100 * 1024 * 1024L; // Default 100MB
}
}
private static long parseByteSize(String sizeStr) {
if (sizeStr == null || sizeStr.trim().isEmpty()) {
return 100 * 1024 * 1024L; // Default 100MB
}
sizeStr = sizeStr.trim().toLowerCase();
String numStr = sizeStr.replaceAll("[^0-9.]", "");
String unit = sizeStr.replaceAll("[0-9.]", "");
try {
double num = Double.parseDouble(numStr);
return switch (unit) {
case "b", "" -> (long) num;
case "kb" -> (long) (num * 1024);
case "mb" -> (long) (num * 1024 * 1024);
case "gb" -> (long) (num * 1024 * 1024 * 1024);
default -> (long) num; // Default to bytes
};
} catch (NumberFormatException e) {
LOG.warn("Failed to parse byte size: {}", sizeStr);
return 100 * 1024 * 1024L; // Default 100MB
}
}
private static SearchClusterMetrics getConservativeDefaults(long totalEntities) {
int conservativeBatchSize = totalEntities > 100000 ? 200 : 100;
// More aggressive defaults with virtual threads - they're lightweight
int conservativeThreads = totalEntities > 500000 ? 20 : 10; // Increased from 3:2 to 20:10
int conservativeConcurrentRequests = totalEntities > 100000 ? 100 : 50; // Doubled
return SearchClusterMetrics.builder()
.availableProcessors(Runtime.getRuntime().availableProcessors())
.heapSizeBytes(0L)
.availableMemoryBytes(0L)
.totalShards(0)
.totalNodes(1)
.cpuUsagePercent(50.0)
.memoryUsagePercent(50.0)
.maxPayloadSizeBytes(50 * 1024 * 1024L) // Conservative 50MB
.recommendedConcurrentRequests(conservativeConcurrentRequests)
.recommendedBatchSize(conservativeBatchSize)
.recommendedProducerThreads(conservativeThreads)
.build();
}
public void logRecommendations() {
LOG.info("=== Auto-Tune Cluster Analysis ===");
LOG.info("Cluster: {} nodes, {} shards", totalNodes, totalShards);
LOG.info(
"Resource Usage: CPU {}%, Memory {}%",
String.format("%.1f", cpuUsagePercent), String.format("%.1f", memoryUsagePercent));
LOG.info(
"Heap: {} MB total, {} MB available",
heapSizeBytes / (1024 * 1024),
availableMemoryBytes / (1024 * 1024));
LOG.info("=== Auto-Tune Recommendations (Virtual Threads Optimized) ===");
LOG.info("Batch Size: {}", recommendedBatchSize);
LOG.info(
"Producer Threads: {} (virtual threads - lightweight & scalable)",
recommendedProducerThreads);
LOG.info("Concurrent Requests: {}", recommendedConcurrentRequests);
LOG.info(
"Max Payload Size: {} MB (with compression optimization)",
maxPayloadSizeBytes / (1024 * 1024));
LOG.info("Note: Virtual threads enable high concurrency for I/O-bound operations");
LOG.info("Note: Request compression is enabled (~75% size reduction for JSON)");
LOG.info("================================================================");
}
}

View File

@ -2607,4 +2607,46 @@ public class ElasticSearchClient implements SearchClient {
"Failed to remove ILM policy from component template: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> clusterStats() throws IOException {
try {
Request request = new Request("GET", "/_cluster/stats");
es.org.elasticsearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch cluster stats", e);
throw new IOException("Failed to fetch cluster stats: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> nodesStats() throws IOException {
try {
Request request = new Request("GET", "/_nodes/stats");
es.org.elasticsearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch nodes stats", e);
throw new IOException("Failed to fetch nodes stats: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> clusterSettings() throws IOException {
try {
Request request = new Request("GET", "/_cluster/settings");
es.org.elasticsearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch cluster settings", e);
throw new IOException("Failed to fetch cluster settings: " + e.getMessage());
}
}
}

View File

@ -2663,4 +2663,46 @@ public class OpenSearchClient implements SearchClient {
throw new IOException("Failed to detach ISM policy from indexes: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> clusterStats() throws IOException {
try {
Request request = new Request("GET", "/_cluster/stats");
os.org.opensearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch cluster stats", e);
throw new IOException("Failed to fetch cluster stats: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> nodesStats() throws IOException {
try {
Request request = new Request("GET", "/_nodes/stats");
os.org.opensearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch nodes stats", e);
throw new IOException("Failed to fetch nodes stats: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> clusterSettings() throws IOException {
try {
Request request = new Request("GET", "/_cluster/settings");
os.org.opensearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
} catch (Exception e) {
LOG.error("Failed to fetch cluster settings", e);
throw new IOException("Failed to fetch cluster settings: " + e.getMessage());
}
}
}

View File

@ -2,28 +2,15 @@ package org.openmetadata.service.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncService {
private static AsyncService instance;
private final ExecutorService executorService;
private AsyncService() {
ThreadFactory threadFactory =
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "AsyncServicePool-Thread-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
executorService = Executors.newFixedThreadPool(20, threadFactory);
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
public static synchronized AsyncService getInstance() {

View File

@ -142,6 +142,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
"Subcommand needed: 'info', 'validate', 'repair', 'check-connection', "
+ "'drop-create', 'changelog', 'migrate', 'migrate-secrets', 'reindex', 'deploy-pipelines', "
+ "'dbServiceCleanup', 'relationshipCleanup'");
LOG.info(
"Use 'reindex --auto-tune' for automatic performance optimization based on cluster capabilities");
return 0;
}
@ -804,6 +806,12 @@ public class OpenMetadataOperations implements Callable<Integer> {
defaultValue = "3",
description = "Maximum number of retries for failed search requests.")
int retries,
@Option(
names = {"--auto-tune"},
defaultValue = "false",
description =
"Enable automatic performance tuning based on cluster capabilities and database entity count. When enabled, overrides manual parameter settings.")
boolean autoTune,
@Option(
names = {"--entities"},
defaultValue = "'all'",
@ -812,7 +820,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
String entityStr) {
try {
LOG.info(
"Running Reindexing with Entities:{} , Batch Size: {}, Payload Size: {}, Recreate-Index: {}, Producer threads: {}, Consumer threads: {}, Queue Size: {}, Back-off: {}, Max Back-off: {}, Max Requests: {}, Retries: {}",
"Running Reindexing with Entities:{} , Batch Size: {}, Payload Size: {}, Recreate-Index: {}, Producer threads: {}, Consumer threads: {}, Queue Size: {}, Back-off: {}, Max Back-off: {}, Max Requests: {}, Retries: {}, Auto-tune: {}",
entityStr,
batchSize,
payloadSize,
@ -823,7 +831,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
backOff,
maxBackOff,
maxRequests,
retries);
retries,
autoTune);
parseConfig();
CollectionRegistry.initialize();
ApplicationHandler.initialize(config);
@ -847,7 +856,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
backOff,
maxBackOff,
maxRequests,
retries);
retries,
autoTune);
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
@ -886,7 +896,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
int backOff,
int maxBackOff,
int maxRequests,
int retries) {
int retries,
boolean autoTune) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App app = appRepository.getByName(null, appName, appRepository.getFields("id"));
@ -902,7 +913,19 @@ public class OpenMetadataOperations implements Callable<Integer> {
.withInitialBackoff(backOff)
.withMaxBackoff(maxBackOff)
.withMaxConcurrentRequests(maxRequests)
.withMaxRetries(retries);
.withMaxRetries(retries)
.withAutoTune(autoTune);
// Log auto-tune behavior
if (autoTune) {
LOG.info(
"Auto-tune enabled: SearchIndexApp will analyze cluster capabilities and optimize parameters automatically");
LOG.info("Manual parameter settings will be overridden by auto-tuned values based on:");
LOG.info(" - OpenSearch/ElasticSearch cluster stats and settings");
LOG.info(" - Database entity counts");
LOG.info(" - Available cluster resources and capacity");
LOG.info(" - Request compression benefits (JSON payloads will be gzip compressed)");
}
// Trigger Application
long currentTime = System.currentTimeMillis();

View File

@ -15,7 +15,8 @@
"maxRetries": 3,
"initialBackoff": 1000,
"maxBackoff": 10000,
"searchIndexMappingLanguage": "EN"
"searchIndexMappingLanguage": "EN",
"autoTune": false
},
"appSchedule": {
"scheduleTimeline": "Custom",

View File

@ -39,6 +39,7 @@ import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
@ -51,7 +52,7 @@ import org.quartz.JobExecutionContext;
@ExtendWith(MockitoExtension.class)
@Slf4j
public class SearchIndexAppTest {
public class SearchIndexAppTest extends OpenMetadataApplicationTest {
@Mock private CollectionDAO collectionDAO;
@Mock private SearchRepository searchRepository;
@ -250,7 +251,7 @@ public class SearchIndexAppTest {
List<EntityInterface> entities = List.of(mockEntity);
ResultList<EntityInterface> resultList =
new ResultList<EntityInterface>(entities, readerErrors, null, null, entities.size());
new ResultList<>(entities, readerErrors, null, null, entities.size());
Map<String, Object> contextData = new HashMap<>();
contextData.put("entityType", "user");
@ -334,8 +335,7 @@ public class SearchIndexAppTest {
searchIndexApp.updateStats("table", batch3);
Stats jobStats = searchIndexApp.getJobData().getStats();
StepStats tableStats =
(StepStats) jobStats.getEntityStats().getAdditionalProperties().get("table");
StepStats tableStats = jobStats.getEntityStats().getAdditionalProperties().get("table");
assertEquals(33, tableStats.getSuccessRecords()); // 10 + 15 + 8
assertEquals(3, tableStats.getFailedRecords()); // 2 + 1 + 0
@ -419,8 +419,7 @@ public class SearchIndexAppTest {
executor.shutdown();
Stats jobStats = searchIndexApp.getJobData().getStats();
StepStats tableStats =
(StepStats) jobStats.getEntityStats().getAdditionalProperties().get("table");
StepStats tableStats = jobStats.getEntityStats().getAdditionalProperties().get("table");
int expectedSuccess = 0;
int expectedFailures = 0;
@ -432,4 +431,42 @@ public class SearchIndexAppTest {
assertEquals(expectedSuccess, tableStats.getSuccessRecords());
assertEquals(expectedFailures, tableStats.getFailedRecords());
}
@Test
void testAutoTuneConfiguration() {
EventPublisherJob autoTuneJobData =
new EventPublisherJob()
.withEntities(Set.of("table"))
.withBatchSize(100)
.withPayLoadSize(10 * 1024 * 1024L)
.withMaxConcurrentRequests(50)
.withProducerThreads(2)
.withAutoTune(true) // Enable auto-tuning
.withRecreateIndex(false)
.withStats(new Stats());
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(autoTuneJobData, Object.class));
lenient()
.when(searchRepository.getSearchType())
.thenReturn(
org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration
.SearchType.ELASTICSEARCH);
assertDoesNotThrow(
() -> searchIndexApp.init(testApp), "SearchIndexApp should handle autoTune configuration");
EventPublisherJob resultJobData = searchIndexApp.getJobData();
assertNotNull(resultJobData, "Job data should be available");
assertTrue(resultJobData.getAutoTune(), "AutoTune flag should be preserved");
assertTrue(resultJobData.getBatchSize() > 0, "Batch size should be positive");
assertTrue(resultJobData.getPayLoadSize() > 0, "Payload size should be positive");
assertTrue(
resultJobData.getMaxConcurrentRequests() > 0, "Concurrent requests should be positive");
assertTrue(resultJobData.getProducerThreads() > 0, "Producer threads should be positive");
}
}

View File

@ -0,0 +1,189 @@
/*
* Copyright 2025 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.junit.jupiter.api.Assertions.*;
import es.org.elasticsearch.client.RestClient;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SearchIndexCompressionIntegrationTest extends OpenMetadataApplicationTest {
private SearchRepository searchRepository;
private SearchIndexApp searchIndexApp;
@BeforeEach
public void setup() {
searchRepository = Entity.getSearchRepository();
searchIndexApp = new SearchIndexApp(Entity.getCollectionDAO(), searchRepository);
}
@Test
void testAutoTuneWithRealElasticSearchCluster() {
LOG.info("=== Testing Auto-Tune with Real ElasticSearch Cluster ===");
RestClient esClient = getSearchClient();
assertNotNull(esClient, "ElasticSearch client should be available");
assertDoesNotThrow(
() -> {
es.org.elasticsearch.client.Response response =
esClient.performRequest(
new es.org.elasticsearch.client.Request("GET", "/_cluster/health"));
assertEquals(200, response.getStatusLine().getStatusCode());
});
EventPublisherJob jobData =
new EventPublisherJob()
.withEntities(Set.of("table", "user"))
.withBatchSize(50)
.withPayLoadSize(5 * 1024 * 1024L)
.withMaxConcurrentRequests(25)
.withProducerThreads(1)
.withAutoTune(true)
.withRecreateIndex(false)
.withStats(new Stats());
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(jobData, Object.class));
assertDoesNotThrow(
() -> searchIndexApp.init(testApp),
"SearchIndexApp initialization with autoTune should succeed");
EventPublisherJob tuned = searchIndexApp.getJobData();
assertNotNull(tuned, "Job data should be available after initialization");
LOG.info("Auto-tune results comparison:");
LOG.info(" Batch Size: {} → {}", jobData.getBatchSize(), tuned.getBatchSize());
LOG.info(
" Payload Size: {} MB → {} MB",
jobData.getPayLoadSize() / (1024 * 1024),
tuned.getPayLoadSize() / (1024 * 1024));
LOG.info(
" Concurrent Requests: {} → {}",
jobData.getMaxConcurrentRequests(),
tuned.getMaxConcurrentRequests());
LOG.info(
" Producer Threads: {} → {}", jobData.getProducerThreads(), tuned.getProducerThreads());
assertTrue(
tuned.getBatchSize() >= jobData.getBatchSize(),
"Auto-tune should maintain or increase batch size with compression");
assertTrue(
tuned.getPayLoadSize() >= jobData.getPayLoadSize(),
"Auto-tune should maintain or increase payload size with compression");
}
@Test
void testSearchClusterMetricsWithRealCluster() {
LOG.info("=== Testing SearchClusterMetrics with Real Cluster ===");
SearchClusterMetrics metrics =
SearchClusterMetrics.fetchClusterMetrics(searchRepository, 1000L);
assertNotNull(metrics, "Cluster metrics should be fetched successfully");
metrics.logRecommendations();
assertTrue(metrics.getTotalNodes() > 0, "Should detect at least 1 node");
assertTrue(metrics.getMaxPayloadSizeBytes() > 0, "Should have a positive max payload size");
assertTrue(metrics.getRecommendedBatchSize() > 0, "Should recommend a positive batch size");
assertTrue(
metrics.getRecommendedConcurrentRequests() > 0,
"Should recommend positive concurrent requests");
long minExpectedPayload = 50 * 1024 * 1024L;
assertTrue(
metrics.getMaxPayloadSizeBytes() >= minExpectedPayload,
"Payload size should benefit from compression (actual: "
+ metrics.getMaxPayloadSizeBytes() / (1024 * 1024)
+ " MB)");
}
@Test
void testCompressionBenefitsWithRealPayloads() {
assertDoesNotThrow(
() -> {
Map<String, Object> clusterSettings = getClusterSettings();
assertNotNull(clusterSettings, "Should be able to fetch cluster settings");
},
"Should be able to access cluster settings for maxContentLength analysis");
EventPublisherJob smallJobData =
new EventPublisherJob()
.withEntities(Set.of("table"))
.withBatchSize(10)
.withPayLoadSize(1024 * 1024L)
.withAutoTune(true)
.withStats(new Stats());
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(smallJobData, Object.class));
searchIndexApp.init(testApp);
EventPublisherJob tunedJobData = searchIndexApp.getJobData();
long compressionBenefit = tunedJobData.getPayLoadSize() / smallJobData.getPayLoadSize();
LOG.info("Compression benefit ratio: {}x larger payload enabled", compressionBenefit);
assertTrue(compressionBenefit >= 1, "Auto-tuning should maintain or improve payload capacity");
}
@Test
void testCompressionHeadersInRealRequests() {
EventPublisherJob jobData =
new EventPublisherJob()
.withEntities(Set.of("table"))
.withBatchSize(5)
.withPayLoadSize(1024 * 1024L)
.withMaxConcurrentRequests(10)
.withProducerThreads(1)
.withAutoTune(false) // Disable auto-tune for this test
.withRecreateIndex(false)
.withStats(new Stats());
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(jobData, Object.class));
assertDoesNotThrow(() -> searchIndexApp.init(testApp), "App initialization should succeed");
assertNotNull(searchIndexApp.getJobData(), "Job data should be initialized");
}
private Map<String, Object> getClusterSettings() throws Exception {
// Get cluster settings from the real ElasticSearch instance
RestClient client = getSearchClient();
es.org.elasticsearch.client.Response response =
client.performRequest(new es.org.elasticsearch.client.Request("GET", "/_cluster/settings"));
String responseBody = org.apache.http.util.EntityUtils.toString(response.getEntity());
return JsonUtils.readValue(responseBody, Map.class);
}
}

View File

@ -89,6 +89,12 @@
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
},
"autoTune": {
"title": "Auto Tune",
"description": "Enable automatic performance tuning based on cluster capabilities and database entity count",
"type": "boolean",
"default": false
}
},
"additionalProperties": false

View File

@ -179,6 +179,11 @@
"afterCursor": {
"description": "Provide After in case of failure to start reindexing after the issue is solved",
"type": "string"
},
"autoTune": {
"description": "Enable automatic performance tuning based on cluster capabilities and database entity count",
"type": "boolean",
"default": false
}
},
"additionalProperties": false

View File

@ -133,6 +133,8 @@ test('Search Index Application', async ({ page }) => {
'Search Indexing Application'
);
await expect(page.locator('form')).toContainText('Auto Tune');
await page.fill('#root\\/batchSize', '100');
await page.getByTestId('tree-select-widget').click();

View File

@ -80,4 +80,11 @@ $$section
Recreate Indexes with updated Language
$$
$$section
### Auto Tune $(id="autoTune")
Enable automatic performance tuning based on cluster capabilities and database entity count
$$

View File

@ -14,6 +14,11 @@
* Search Indexing App.
*/
export interface SearchIndexingAppConfig {
/**
* Enable automatic performance tuning based on cluster capabilities and database entity
* count
*/
autoTune?: boolean;
/**
* Maximum number of events sent in a batch (Default 100).
*/

View File

@ -18,6 +18,11 @@ export interface EventPublisherJob {
* Provide After in case of failure to start reindexing after the issue is solved
*/
afterCursor?: string;
/**
* Enable automatic performance tuning based on cluster capabilities and database entity
* count
*/
autoTune?: boolean;
/**
* Maximum number of events sent in a batch (Default 10).
*/

View File

@ -136,6 +136,12 @@
"type": "string",
"enum": ["EN", "JP", "ZH"],
"default": "EN"
},
"autoTune": {
"title": "Auto Tune",
"description": "Enable automatic performance tuning based on cluster capabilities and database entity count",
"type": "boolean",
"default": false
}
},
"additionalProperties": false