Add more details to search reindex into slack, default the max content to 10mb (#22741)

* Add more details to search reindex into slack, default the max content to 10mb

* Fix for --entities param to work with or with-out quotes

(cherry picked from commit ac3cedff08c905ca0d987622719b7dbacd408d34)
This commit is contained in:
Sriharsha Chintalapani 2025-08-05 12:25:27 -07:00 committed by OpenMetadata Release Bot
parent 923bc68f8f
commit 81db639f27
3 changed files with 71 additions and 73 deletions

View File

@ -657,6 +657,37 @@ public class SearchIndexApp extends AbstractNativeApplication {
finalConfigDetails.put(PAYLOAD_SIZE, (jobData.getPayLoadSize() / (1024 * 1024)) + " MB");
finalConfigDetails.put(
CONCURRENT_REQUESTS, String.valueOf(jobData.getMaxConcurrentRequests()));
if (Boolean.TRUE.equals(jobData.getAutoTune())) {
finalConfigDetails.put("Cluster Nodes", String.valueOf(clusterMetrics.getTotalNodes()));
finalConfigDetails.put("Cluster Shards", String.valueOf(clusterMetrics.getTotalShards()));
finalConfigDetails.put(
"CPU Usage", String.format("%.1f%%", clusterMetrics.getCpuUsagePercent()));
finalConfigDetails.put(
"Memory Usage", String.format("%.1f%%", clusterMetrics.getMemoryUsagePercent()));
finalConfigDetails.put(
"Max Payload", (clusterMetrics.getMaxPayloadSizeBytes() / (1024 * 1024)) + " MB");
finalConfigDetails.put(
"Max Content Length", (clusterMetrics.getMaxContentLength() / (1024 * 1024)) + " MB");
boolean isUsingDefaults = clusterMetrics.getTotalShards() == 0;
String configSource = isUsingDefaults ? " (using defaults)" : " (from cluster)";
String clusterStatus =
String.format(
"Cluster: %d nodes, %.0f%% CPU, %.0f%% memory, %dMB limit%s",
clusterMetrics.getTotalNodes(),
clusterMetrics.getCpuUsagePercent(),
clusterMetrics.getMemoryUsagePercent(),
(int) (clusterMetrics.getMaxContentLength() / (1024 * 1024)),
configSource);
finalConfigDetails.put("Cluster Status", clusterStatus);
finalConfigDetails.put(
"Config Source", isUsingDefaults ? "Conservative Defaults" : "Cluster Metrics");
}
} else if (Boolean.TRUE.equals(jobData.getAutoTune())) {
finalConfigDetails.put(
"Cluster Status", "Unable to fetch cluster metrics (using conservative defaults)");
}
return finalConfigDetails;

View File

@ -20,6 +20,7 @@ public class SearchClusterMetrics {
private final double cpuUsagePercent;
private final double memoryUsagePercent;
private final long maxPayloadSizeBytes;
private final long maxContentLength;
private final int recommendedConcurrentRequests;
private final int recommendedBatchSize;
private final int recommendedProducerThreads;
@ -61,7 +62,6 @@ public class SearchClusterMetrics {
Map<String, Object> nodesStats = osClient.nodesStats();
Map<String, Object> clusterSettings = osClient.clusterSettings();
// Debug logging for API responses
LOG.debug("ClusterStats response: {}", clusterStats);
LOG.debug("NodesStats response: {}", nodesStats);
@ -175,10 +175,8 @@ public class SearchClusterMetrics {
long totalEntities,
int maxDbConnections) {
int maxProducerThreads = (maxDbConnections * 3) / 4; // 75% of connection pool
int recommendedConcurrentRequests = maxProducerThreads;
int recommendedProducerThreads =
Math.min(maxProducerThreads, 10 * totalNodes); // Reduced from 30 to 10 per node
int maxProducerThreads = (maxDbConnections * 3) / 4;
int recommendedProducerThreads = Math.min(maxProducerThreads, 10 * totalNodes);
if (memoryUsagePercent > 80) {
recommendedProducerThreads = Math.max(10, recommendedProducerThreads / 4);
@ -192,40 +190,28 @@ public class SearchClusterMetrics {
int recommendedConsumerThreads =
Math.min(30, Math.max(10, availableCores * 2)); // 2x cores, bounded
// Adjust based on cluster size - more nodes can handle more concurrent requests
if (totalNodes > 3) {
recommendedConsumerThreads = Math.min(40, recommendedConsumerThreads + (totalNodes * 2));
}
// Adjust based on memory pressure
if (memoryUsagePercent > 80) {
recommendedConsumerThreads = Math.max(10, recommendedConsumerThreads / 2);
} else if (memoryUsagePercent < 40 && totalEntities > 100000) {
// Only increase for large datasets with available memory
recommendedConsumerThreads = Math.min(50, (int) (recommendedConsumerThreads * 1.5));
}
// Concurrent requests should match what the cluster can handle
// Start conservative and let the bulk processor's backoff handle overload
int requestsPerNode = 50; // Base requests per node
// Adjust based on cluster load
if (cpuUsagePercent > 70 || memoryUsagePercent > 70) {
requestsPerNode = 25; // Reduce if cluster is busy
requestsPerNode = 25;
} else if (cpuUsagePercent < 30 && memoryUsagePercent < 50) {
requestsPerNode = 75; // Increase if cluster is idle
requestsPerNode = 75;
}
int baseConcurrentRequests = Math.min(200, totalNodes * requestsPerNode);
// Further adjustments for extreme memory conditions
if (memoryUsagePercent > 80) {
baseConcurrentRequests = Math.max(10, baseConcurrentRequests / 2);
}
// Ensure we don't exceed practical limits
baseConcurrentRequests = Math.min(baseConcurrentRequests, 300);
long heapBasedPayloadSize =
Math.min(500 * 1024 * 1024L, heapMaxBytes / 20); // Max 500MB or 5% of heap
@ -240,23 +226,17 @@ public class SearchClusterMetrics {
maxPayloadSize / (1024 * 1024),
heapBasedPayloadSize / (1024 * 1024),
maxContentLength / (1024 * 1024));
// Dynamic batch size calculation based on actual entity characteristics
// Start conservative and let adaptive tuning increase if appropriate
int avgEntitySizeKB = 10; // Conservative estimate, will be refined during execution
// Calculate batch size considering compression benefits
// Most clusters have compression enabled by default
int avgEntitySizeKB = maxPayloadSize <= 10 * 1024 * 1024 ? 20 : 10; // More conservative for AWS
int recommendedBatchSize = (int) Math.min(1000, maxPayloadSize / (avgEntitySizeKB * 1024L));
recommendedBatchSize =
Math.max(100, recommendedBatchSize); // Higher minimum since entities are smaller
// Scale batch size based on dataset size and cluster capacity
// Larger datasets need more conservative settings to maintain stability
if (maxPayloadSize <= 10 * 1024 * 1024) {
recommendedBatchSize = Math.min(300, recommendedBatchSize); // Cap at 300 for AWS
}
recommendedBatchSize = Math.max(50, recommendedBatchSize); // Lower minimum for safety
if (totalEntities > 1000000) {
recommendedBatchSize = Math.min(500, recommendedBatchSize);
recommendedProducerThreads = Math.min(20, recommendedProducerThreads);
// Reduce concurrent requests for very large datasets
baseConcurrentRequests = Math.min(150, baseConcurrentRequests);
} else if (totalEntities > 500000) {
recommendedBatchSize = Math.min(600, recommendedBatchSize);
@ -266,21 +246,13 @@ public class SearchClusterMetrics {
recommendedProducerThreads = Math.min(30, recommendedProducerThreads);
}
// For small datasets, we can be more aggressive
if (totalEntities < 50000 && memoryUsagePercent < 60) {
recommendedBatchSize = Math.min(1000, recommendedBatchSize * 2);
}
// Conservative queue sizing to balance memory usage and throughput
// Queue should hold enough batches to keep consumers busy without excessive memory use
int queueBatches =
Math.min(recommendedProducerThreads * 2, 20); // 2 batches per producer, max 20
int recommendedQueueSize =
Math.min(
10000, // Absolute max to prevent memory issues
recommendedBatchSize * queueBatches);
int queueBatches = Math.min(recommendedProducerThreads * 2, 20);
int recommendedQueueSize = Math.min(10000, recommendedBatchSize * queueBatches);
// Ensure minimum queue size for small batches
recommendedQueueSize = Math.max(1000, recommendedQueueSize);
return SearchClusterMetrics.builder()
@ -292,6 +264,7 @@ public class SearchClusterMetrics {
.cpuUsagePercent(cpuUsagePercent)
.memoryUsagePercent(memoryUsagePercent)
.maxPayloadSizeBytes(maxPayloadSize)
.maxContentLength(maxContentLength)
.recommendedConcurrentRequests(baseConcurrentRequests)
.recommendedBatchSize(recommendedBatchSize)
.recommendedProducerThreads(recommendedProducerThreads)
@ -331,7 +304,6 @@ public class SearchClusterMetrics {
compressionEnabled = (Boolean) defaultSettings.get("http.compression");
}
// Default is false in Elasticsearch/OpenSearch
return compressionEnabled != null ? compressionEnabled : false;
} catch (Exception e) {
LOG.debug("Failed to check compression setting, assuming disabled: {}", e.getMessage());
@ -342,7 +314,9 @@ public class SearchClusterMetrics {
@SuppressWarnings("unchecked")
public static long extractMaxContentLength(Map<String, Object> clusterSettings) {
try {
long defaultMaxContentLength = 100 * 1024 * 1024L; // 100MB
// Use a conservative 10MB default for AWS-managed OpenSearch/ElasticSearch
// AWS OpenSearch has a hard limit of 10MB that may not be exposed in cluster settings
long defaultMaxContentLength = 10 * 1024 * 1024L; // Conservative 10MB default
Map<String, Object> persistentSettings =
(Map<String, Object>) clusterSettings.get("persistent");
@ -370,18 +344,18 @@ public class SearchClusterMetrics {
}
LOG.info(
"No max_content_length setting found in cluster, using default: {} bytes",
"No max_content_length setting found in cluster, using conservative default: {} bytes",
defaultMaxContentLength);
return defaultMaxContentLength;
} catch (Exception e) {
LOG.warn("Failed to extract maxContentLength from cluster settings: {}", e.getMessage());
return 100 * 1024 * 1024L; // Default 100MB
return 10 * 1024 * 1024L; // Conservative 10MB default for safety
}
}
private static long parseByteSize(String sizeStr) {
if (sizeStr == null || sizeStr.trim().isEmpty()) {
return 100 * 1024 * 1024L; // Default 100MB
return 10 * 1024 * 1024L; // Conservative 10MB default for safety
}
sizeStr = sizeStr.trim().toLowerCase();
@ -429,19 +403,10 @@ public class SearchClusterMetrics {
return 50.0;
}
private static long extractLongValue(Map<String, Object> map, String key, long defaultValue) {
Object value = map.get(key);
if (value instanceof Number) {
return ((Number) value).longValue();
}
LOG.debug("Unable to extract long value for key '{}', using default: {}", key, defaultValue);
return defaultValue;
}
private static int extractIntValue(Map<String, Object> map, String key, int defaultValue) {
Object value = map.get(key);
if (value instanceof Number) {
return ((Number) value).intValue();
if (value instanceof Number number) {
return number.intValue();
}
LOG.debug("Unable to extract int value for key '{}', using default: {}", key, defaultValue);
return defaultValue;
@ -451,25 +416,22 @@ public class SearchClusterMetrics {
SearchRepository searchRepository, long totalEntities, int maxDbConnections) {
int conservativeBatchSize;
if (totalEntities > 1000000) {
conservativeBatchSize = 500;
} else if (totalEntities > 500000) {
conservativeBatchSize = 400;
} else if (totalEntities > 250000) {
conservativeBatchSize = 300;
} else if (totalEntities > 100000) {
conservativeBatchSize = 200;
} else if (totalEntities > 50000) {
} else if (totalEntities > 500000) {
conservativeBatchSize = 150;
} else {
} else if (totalEntities > 250000) {
conservativeBatchSize = 125;
} else if (totalEntities > 100000) {
conservativeBatchSize = 100;
} else if (totalEntities > 50000) {
conservativeBatchSize = 75;
} else {
conservativeBatchSize = 50;
}
// Conservative DB connection usage - use 75% of configured max size
int conservativeThreads = (maxDbConnections * 3) / 4;
int conservativeConcurrentRequests = totalEntities > 100000 ? 50 : 25;
int conservativeConsumerThreads = 20; // Default 20 consumers with virtual threads
int conservativeConsumerThreads = 20;
int conservativeQueueSize = conservativeBatchSize * conservativeConcurrentRequests * 2;
long maxHeap = Runtime.getRuntime().maxMemory();
@ -478,8 +440,8 @@ public class SearchClusterMetrics {
long usedHeap = totalHeap - freeHeap;
double heapUsagePercent = (maxHeap > 0) ? (double) usedHeap / maxHeap * 100 : 50.0;
// Default to 100MB if we can't fetch from cluster
long maxPayloadSize = 100 * 1024 * 1024L; // Default 100MB
// Default to conservative 10MB for AWS-managed clusters if we can't fetch from cluster
long maxPayloadSize = 10 * 1024 * 1024L; // Conservative 10MB default
try {
if (searchRepository != null) {
SearchClient searchClient = searchRepository.getSearchClient();
@ -517,6 +479,7 @@ public class SearchClusterMetrics {
.cpuUsagePercent(50.0)
.memoryUsagePercent(heapUsagePercent)
.maxPayloadSizeBytes(maxPayloadSize)
.maxContentLength(maxPayloadSize * 10 / 9)
.recommendedConcurrentRequests(conservativeConcurrentRequests)
.recommendedBatchSize(conservativeBatchSize)
.recommendedProducerThreads(conservativeThreads)

View File

@ -890,8 +890,12 @@ public class OpenMetadataOperations implements Callable<Integer> {
TypeRegistry.instance().initialize(typeRepository);
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "SearchIndexingApplication";
Set<String> entities =
new HashSet<>(Arrays.asList(entityStr.substring(1, entityStr.length() - 1).split(",")));
// Handle entityStr with or without quotes
String cleanEntityStr = entityStr;
if (entityStr.startsWith("'") && entityStr.endsWith("'")) {
cleanEntityStr = entityStr.substring(1, entityStr.length() - 1);
}
Set<String> entities = new HashSet<>(Arrays.asList(cleanEntityStr.split(",")));
return executeSearchReindexApp(
appName,
entities,