diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index c3248e71983..58e940f85f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -657,6 +657,37 @@ public class SearchIndexApp extends AbstractNativeApplication { finalConfigDetails.put(PAYLOAD_SIZE, (jobData.getPayLoadSize() / (1024 * 1024)) + " MB"); finalConfigDetails.put( CONCURRENT_REQUESTS, String.valueOf(jobData.getMaxConcurrentRequests())); + + if (Boolean.TRUE.equals(jobData.getAutoTune())) { + finalConfigDetails.put("Cluster Nodes", String.valueOf(clusterMetrics.getTotalNodes())); + finalConfigDetails.put("Cluster Shards", String.valueOf(clusterMetrics.getTotalShards())); + finalConfigDetails.put( + "CPU Usage", String.format("%.1f%%", clusterMetrics.getCpuUsagePercent())); + finalConfigDetails.put( + "Memory Usage", String.format("%.1f%%", clusterMetrics.getMemoryUsagePercent())); + finalConfigDetails.put( + "Max Payload", (clusterMetrics.getMaxPayloadSizeBytes() / (1024 * 1024)) + " MB"); + finalConfigDetails.put( + "Max Content Length", (clusterMetrics.getMaxContentLength() / (1024 * 1024)) + " MB"); + + boolean isUsingDefaults = clusterMetrics.getTotalShards() == 0; + String configSource = isUsingDefaults ? " (using defaults)" : " (from cluster)"; + + String clusterStatus = + String.format( + "Cluster: %d nodes, %.0f%% CPU, %.0f%% memory, %dMB limit%s", + clusterMetrics.getTotalNodes(), + clusterMetrics.getCpuUsagePercent(), + clusterMetrics.getMemoryUsagePercent(), + (int) (clusterMetrics.getMaxContentLength() / (1024 * 1024)), + configSource); + finalConfigDetails.put("Cluster Status", clusterStatus); + finalConfigDetails.put( + "Config Source", isUsingDefaults ? "Conservative Defaults" : "Cluster Metrics"); + } + } else if (Boolean.TRUE.equals(jobData.getAutoTune())) { + finalConfigDetails.put( + "Cluster Status", "Unable to fetch cluster metrics (using conservative defaults)"); } return finalConfigDetails; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClusterMetrics.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClusterMetrics.java index 1e18ac139e2..999f1c191a6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClusterMetrics.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClusterMetrics.java @@ -20,6 +20,7 @@ public class SearchClusterMetrics { private final double cpuUsagePercent; private final double memoryUsagePercent; private final long maxPayloadSizeBytes; + private final long maxContentLength; private final int recommendedConcurrentRequests; private final int recommendedBatchSize; private final int recommendedProducerThreads; @@ -61,7 +62,6 @@ public class SearchClusterMetrics { Map nodesStats = osClient.nodesStats(); Map clusterSettings = osClient.clusterSettings(); - // Debug logging for API responses LOG.debug("ClusterStats response: {}", clusterStats); LOG.debug("NodesStats response: {}", nodesStats); @@ -175,10 +175,8 @@ public class SearchClusterMetrics { long totalEntities, int maxDbConnections) { - int maxProducerThreads = (maxDbConnections * 3) / 4; // 75% of connection pool - int recommendedConcurrentRequests = maxProducerThreads; - int recommendedProducerThreads = - Math.min(maxProducerThreads, 10 * totalNodes); // Reduced from 30 to 10 per node + int maxProducerThreads = (maxDbConnections * 3) / 4; + int recommendedProducerThreads = Math.min(maxProducerThreads, 10 * totalNodes); if (memoryUsagePercent > 80) { recommendedProducerThreads = Math.max(10, recommendedProducerThreads / 4); @@ -192,40 +190,28 @@ public class SearchClusterMetrics { int recommendedConsumerThreads = Math.min(30, Math.max(10, availableCores * 2)); // 2x cores, bounded - // Adjust based on cluster size - more nodes can handle more concurrent requests if (totalNodes > 3) { recommendedConsumerThreads = Math.min(40, recommendedConsumerThreads + (totalNodes * 2)); } - // Adjust based on memory pressure if (memoryUsagePercent > 80) { recommendedConsumerThreads = Math.max(10, recommendedConsumerThreads / 2); } else if (memoryUsagePercent < 40 && totalEntities > 100000) { - // Only increase for large datasets with available memory recommendedConsumerThreads = Math.min(50, (int) (recommendedConsumerThreads * 1.5)); } - // Concurrent requests should match what the cluster can handle - // Start conservative and let the bulk processor's backoff handle overload int requestsPerNode = 50; // Base requests per node - - // Adjust based on cluster load if (cpuUsagePercent > 70 || memoryUsagePercent > 70) { - requestsPerNode = 25; // Reduce if cluster is busy + requestsPerNode = 25; } else if (cpuUsagePercent < 30 && memoryUsagePercent < 50) { - requestsPerNode = 75; // Increase if cluster is idle + requestsPerNode = 75; } int baseConcurrentRequests = Math.min(200, totalNodes * requestsPerNode); - - // Further adjustments for extreme memory conditions if (memoryUsagePercent > 80) { baseConcurrentRequests = Math.max(10, baseConcurrentRequests / 2); } - // Ensure we don't exceed practical limits - baseConcurrentRequests = Math.min(baseConcurrentRequests, 300); - long heapBasedPayloadSize = Math.min(500 * 1024 * 1024L, heapMaxBytes / 20); // Max 500MB or 5% of heap @@ -240,23 +226,17 @@ public class SearchClusterMetrics { maxPayloadSize / (1024 * 1024), heapBasedPayloadSize / (1024 * 1024), maxContentLength / (1024 * 1024)); - - // Dynamic batch size calculation based on actual entity characteristics - // Start conservative and let adaptive tuning increase if appropriate - int avgEntitySizeKB = 10; // Conservative estimate, will be refined during execution - - // Calculate batch size considering compression benefits - // Most clusters have compression enabled by default + int avgEntitySizeKB = maxPayloadSize <= 10 * 1024 * 1024 ? 20 : 10; // More conservative for AWS int recommendedBatchSize = (int) Math.min(1000, maxPayloadSize / (avgEntitySizeKB * 1024L)); - recommendedBatchSize = - Math.max(100, recommendedBatchSize); // Higher minimum since entities are smaller - // Scale batch size based on dataset size and cluster capacity - // Larger datasets need more conservative settings to maintain stability + if (maxPayloadSize <= 10 * 1024 * 1024) { + recommendedBatchSize = Math.min(300, recommendedBatchSize); // Cap at 300 for AWS + } + recommendedBatchSize = Math.max(50, recommendedBatchSize); // Lower minimum for safety + if (totalEntities > 1000000) { recommendedBatchSize = Math.min(500, recommendedBatchSize); recommendedProducerThreads = Math.min(20, recommendedProducerThreads); - // Reduce concurrent requests for very large datasets baseConcurrentRequests = Math.min(150, baseConcurrentRequests); } else if (totalEntities > 500000) { recommendedBatchSize = Math.min(600, recommendedBatchSize); @@ -266,21 +246,13 @@ public class SearchClusterMetrics { recommendedProducerThreads = Math.min(30, recommendedProducerThreads); } - // For small datasets, we can be more aggressive if (totalEntities < 50000 && memoryUsagePercent < 60) { recommendedBatchSize = Math.min(1000, recommendedBatchSize * 2); } - // Conservative queue sizing to balance memory usage and throughput - // Queue should hold enough batches to keep consumers busy without excessive memory use - int queueBatches = - Math.min(recommendedProducerThreads * 2, 20); // 2 batches per producer, max 20 - int recommendedQueueSize = - Math.min( - 10000, // Absolute max to prevent memory issues - recommendedBatchSize * queueBatches); + int queueBatches = Math.min(recommendedProducerThreads * 2, 20); + int recommendedQueueSize = Math.min(10000, recommendedBatchSize * queueBatches); - // Ensure minimum queue size for small batches recommendedQueueSize = Math.max(1000, recommendedQueueSize); return SearchClusterMetrics.builder() @@ -292,6 +264,7 @@ public class SearchClusterMetrics { .cpuUsagePercent(cpuUsagePercent) .memoryUsagePercent(memoryUsagePercent) .maxPayloadSizeBytes(maxPayloadSize) + .maxContentLength(maxContentLength) .recommendedConcurrentRequests(baseConcurrentRequests) .recommendedBatchSize(recommendedBatchSize) .recommendedProducerThreads(recommendedProducerThreads) @@ -331,7 +304,6 @@ public class SearchClusterMetrics { compressionEnabled = (Boolean) defaultSettings.get("http.compression"); } - // Default is false in Elasticsearch/OpenSearch return compressionEnabled != null ? compressionEnabled : false; } catch (Exception e) { LOG.debug("Failed to check compression setting, assuming disabled: {}", e.getMessage()); @@ -342,7 +314,9 @@ public class SearchClusterMetrics { @SuppressWarnings("unchecked") public static long extractMaxContentLength(Map clusterSettings) { try { - long defaultMaxContentLength = 100 * 1024 * 1024L; // 100MB + // Use a conservative 10MB default for AWS-managed OpenSearch/ElasticSearch + // AWS OpenSearch has a hard limit of 10MB that may not be exposed in cluster settings + long defaultMaxContentLength = 10 * 1024 * 1024L; // Conservative 10MB default Map persistentSettings = (Map) clusterSettings.get("persistent"); @@ -370,18 +344,18 @@ public class SearchClusterMetrics { } LOG.info( - "No max_content_length setting found in cluster, using default: {} bytes", + "No max_content_length setting found in cluster, using conservative default: {} bytes", defaultMaxContentLength); return defaultMaxContentLength; } catch (Exception e) { LOG.warn("Failed to extract maxContentLength from cluster settings: {}", e.getMessage()); - return 100 * 1024 * 1024L; // Default 100MB + return 10 * 1024 * 1024L; // Conservative 10MB default for safety } } private static long parseByteSize(String sizeStr) { if (sizeStr == null || sizeStr.trim().isEmpty()) { - return 100 * 1024 * 1024L; // Default 100MB + return 10 * 1024 * 1024L; // Conservative 10MB default for safety } sizeStr = sizeStr.trim().toLowerCase(); @@ -429,19 +403,10 @@ public class SearchClusterMetrics { return 50.0; } - private static long extractLongValue(Map map, String key, long defaultValue) { - Object value = map.get(key); - if (value instanceof Number) { - return ((Number) value).longValue(); - } - LOG.debug("Unable to extract long value for key '{}', using default: {}", key, defaultValue); - return defaultValue; - } - private static int extractIntValue(Map map, String key, int defaultValue) { Object value = map.get(key); - if (value instanceof Number) { - return ((Number) value).intValue(); + if (value instanceof Number number) { + return number.intValue(); } LOG.debug("Unable to extract int value for key '{}', using default: {}", key, defaultValue); return defaultValue; @@ -451,25 +416,22 @@ public class SearchClusterMetrics { SearchRepository searchRepository, long totalEntities, int maxDbConnections) { int conservativeBatchSize; if (totalEntities > 1000000) { - conservativeBatchSize = 500; - } else if (totalEntities > 500000) { - conservativeBatchSize = 400; - } else if (totalEntities > 250000) { - conservativeBatchSize = 300; - } else if (totalEntities > 100000) { conservativeBatchSize = 200; - } else if (totalEntities > 50000) { + } else if (totalEntities > 500000) { conservativeBatchSize = 150; - } else { + } else if (totalEntities > 250000) { + conservativeBatchSize = 125; + } else if (totalEntities > 100000) { conservativeBatchSize = 100; + } else if (totalEntities > 50000) { + conservativeBatchSize = 75; + } else { + conservativeBatchSize = 50; } - // Conservative DB connection usage - use 75% of configured max size int conservativeThreads = (maxDbConnections * 3) / 4; - int conservativeConcurrentRequests = totalEntities > 100000 ? 50 : 25; - - int conservativeConsumerThreads = 20; // Default 20 consumers with virtual threads + int conservativeConsumerThreads = 20; int conservativeQueueSize = conservativeBatchSize * conservativeConcurrentRequests * 2; long maxHeap = Runtime.getRuntime().maxMemory(); @@ -478,8 +440,8 @@ public class SearchClusterMetrics { long usedHeap = totalHeap - freeHeap; double heapUsagePercent = (maxHeap > 0) ? (double) usedHeap / maxHeap * 100 : 50.0; - // Default to 100MB if we can't fetch from cluster - long maxPayloadSize = 100 * 1024 * 1024L; // Default 100MB + // Default to conservative 10MB for AWS-managed clusters if we can't fetch from cluster + long maxPayloadSize = 10 * 1024 * 1024L; // Conservative 10MB default try { if (searchRepository != null) { SearchClient searchClient = searchRepository.getSearchClient(); @@ -517,6 +479,7 @@ public class SearchClusterMetrics { .cpuUsagePercent(50.0) .memoryUsagePercent(heapUsagePercent) .maxPayloadSizeBytes(maxPayloadSize) + .maxContentLength(maxPayloadSize * 10 / 9) .recommendedConcurrentRequests(conservativeConcurrentRequests) .recommendedBatchSize(conservativeBatchSize) .recommendedProducerThreads(conservativeThreads) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 3e77d4e4055..772a7b9ce0d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -890,8 +890,12 @@ public class OpenMetadataOperations implements Callable { TypeRegistry.instance().initialize(typeRepository); AppScheduler.initialize(config, collectionDAO, searchRepository); String appName = "SearchIndexingApplication"; - Set entities = - new HashSet<>(Arrays.asList(entityStr.substring(1, entityStr.length() - 1).split(","))); + // Handle entityStr with or without quotes + String cleanEntityStr = entityStr; + if (entityStr.startsWith("'") && entityStr.endsWith("'")) { + cleanEntityStr = entityStr.substring(1, entityStr.length() - 1); + } + Set entities = new HashSet<>(Arrays.asList(cleanEntityStr.split(","))); return executeSearchReindexApp( appName, entities,