diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java index aa3ad92e777..730a8092070 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java @@ -16,7 +16,6 @@ import es.org.elasticsearch.common.unit.ByteSizeValue; import es.org.elasticsearch.core.TimeValue; import es.org.elasticsearch.xcontent.XContentType; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -28,7 +27,6 @@ import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.SearchIndexException; -import org.openmetadata.service.search.SearchClusterMetrics; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.elasticsearch.ElasticSearchClient; import org.openmetadata.service.search.models.IndexMapping; @@ -42,9 +40,8 @@ public class ElasticSearchBulkSink implements BulkSink { private final ElasticSearchClient searchClient; private final SearchRepository searchRepository; - private volatile BulkProcessor bulkProcessor; + private final BulkProcessor bulkProcessor; private final StepStats stats = new StepStats(); - private volatile long currentPayloadSize = 0; // Track metrics private final AtomicLong totalSubmitted = new AtomicLong(0); @@ -54,10 +51,6 @@ public class ElasticSearchBulkSink implements BulkSink { // Configuration private volatile int batchSize; private volatile int maxConcurrentRequests; - private final long maxPayloadSizeBytes; - private volatile BulkProcessor currentBulkProcessor; - private final long clusterMaxRequestSize; - private final boolean compressionEnabled; public ElasticSearchBulkSink( SearchRepository searchRepository, @@ -69,27 +62,16 @@ public class ElasticSearchBulkSink implements BulkSink { this.searchClient = (ElasticSearchClient) searchRepository.getSearchClient(); this.batchSize = batchSize; this.maxConcurrentRequests = maxConcurrentRequests; - this.maxPayloadSizeBytes = maxPayloadSizeBytes; - - // Get the actual max request size and compression status from cluster settings - Map clusterSettings = getClusterSettings(); - this.clusterMaxRequestSize = SearchClusterMetrics.extractMaxContentLength(clusterSettings); - this.compressionEnabled = SearchClusterMetrics.isCompressionEnabled(clusterSettings); - - LOG.info( - "Elasticsearch cluster config - Max content length: {} MB, Compression enabled: {}", - clusterMaxRequestSize / (1024 * 1024), - compressionEnabled); // Initialize stats stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0); // Create bulk processor - this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests); - this.currentBulkProcessor = this.bulkProcessor; + this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests, maxPayloadSizeBytes); } - private BulkProcessor createBulkProcessor(int bulkActions, int concurrentRequests) { + private BulkProcessor createBulkProcessor( + int bulkActions, int concurrentRequests, long maxPayloadSizeBytes) { RestHighLevelClient client = (RestHighLevelClient) searchClient.getClient(); // Create custom request options with larger buffer for big responses @@ -98,16 +80,13 @@ public class ElasticSearchBulkSink implements BulkSink { new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory( 100 * 1024 * 1024)); // 100MB buffer - // Note: The Java client doesn't automatically compress requests. - // Compression must be enabled at the HTTP client level (Apache HttpClient) - // or by using a custom request interceptor - RequestOptions requestOptions = optionsBuilder.build(); LOG.info( - "Creating BulkProcessor with batch size {} and {} concurrent requests", + "Creating BulkProcessor with batch size {}, {} concurrent requests, max payload {} MB", bulkActions, - concurrentRequests); + concurrentRequests, + maxPayloadSizeBytes / (1024 * 1024)); return BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, requestOptions, bulkListener), @@ -116,9 +95,6 @@ public class ElasticSearchBulkSink implements BulkSink { public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); totalSubmitted.addAndGet(numberOfActions); - // Reset payload size counter after flush - // Reset payload size when bulk operation starts - currentPayloadSize = 0; LOG.debug( "Executing bulk request {} with {} actions", executionId, numberOfActions); } @@ -149,7 +125,7 @@ public class ElasticSearchBulkSink implements BulkSink { String failureMessage = response.buildFailureMessage(); if (failureMessage.contains("rejected_execution_exception")) { LOG.warn( - "Detected backpressure from Elasticsearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff."); + "Detected backpressure from Elasticsearch cluster. The BulkProcessor will handle retry with exponential backoff."); } } else { totalSuccess.addAndGet(numberOfActions); @@ -176,23 +152,19 @@ public class ElasticSearchBulkSink implements BulkSink { if (failure.getMessage() != null && failure.getMessage().contains("rejected_execution_exception")) { LOG.warn( - "Detected backpressure from Elasticsearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff."); + "Detected backpressure from Elasticsearch cluster. The BulkProcessor will handle retry with exponential backoff."); } updateStats(); } }) .setBulkActions(bulkActions) - // Ensure we never exceed Elasticsearch's configured limit - .setBulkSize( - new ByteSizeValue( - Math.min(maxPayloadSizeBytes, clusterMaxRequestSize), ByteSizeUnit.BYTES)) - .setFlushInterval(TimeValue.timeValueSeconds(10)) // Increased from 5s + .setBulkSize(new ByteSizeValue(maxPayloadSizeBytes, ByteSizeUnit.BYTES)) + .setFlushInterval(TimeValue.timeValueSeconds(10)) .setConcurrentRequests(concurrentRequests) .setBackoffPolicy( BackoffPolicy.exponentialBackoff( - TimeValue.timeValueMillis(1000), - 3)) // More aggressive backoff: 1s initial, 3 retries + TimeValue.timeValueMillis(1000), 3)) // 1s initial delay, 3 retries .build(); } @@ -224,10 +196,6 @@ public class ElasticSearchBulkSink implements BulkSink { addEntity(entity, indexName); } } - - // The bulk processor handles batching and flushing automatically - // We don't need to explicitly flush here - } catch (Exception e) { LOG.error("Failed to write {} entities of type {}", entities.size(), entityType, e); @@ -254,8 +222,8 @@ public class ElasticSearchBulkSink implements BulkSink { updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); - // Add to bulk processor with size checking - addRequestWithSizeCheck(updateRequest, json.getBytes().length); + // Add to bulk processor - it handles everything including size limits + bulkProcessor.add(updateRequest); } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) @@ -266,83 +234,10 @@ public class ElasticSearchBulkSink implements BulkSink { IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(json, XContentType.JSON); - // Add to bulk processor with size checking - addRequestWithSizeCheck(indexRequest, json.getBytes().length); + // Add to bulk processor - it handles everything including size limits + bulkProcessor.add(indexRequest); } - /** - * Adds a request to the bulk processor, checking payload size and flushing if needed. - * This centralizes the logic for managing payload size limits. - */ - private void addRequestWithSizeCheck(Object request, long uncompressedSize) { - // Check if single document exceeds cluster limit - if (uncompressedSize > clusterMaxRequestSize) { - LOG.error( - "Single document exceeds Elasticsearch cluster limit: {} MB > {} MB. Document will be skipped.", - uncompressedSize / (1024.0 * 1024.0), - clusterMaxRequestSize / (1024.0 * 1024.0)); - totalFailed.incrementAndGet(); - updateStats(); - return; // Skip this document - } - - // Calculate effective size based on whether compression is enabled - long effectiveSize; - if (compressionEnabled) { - // Compression typically achieves 75% reduction for JSON (25% of original size) - effectiveSize = uncompressedSize / 4; - LOG.debug( - "Using compressed size estimate: {} bytes (from {} uncompressed)", - effectiveSize, - uncompressedSize); - } else { - // Use uncompressed size when compression is disabled - effectiveSize = uncompressedSize; - } - - // Check if adding this request would exceed the payload size limit - // Using volatile read/write for currentPayloadSize - no locking needed - long currentSize = currentPayloadSize; - long effectiveLimit = Math.min(maxPayloadSizeBytes, clusterMaxRequestSize); - - if (currentSize > 0 && currentSize + effectiveSize > effectiveLimit) { - LOG.info( - "Payload size limit reached. Current: {} MB, Request: {} KB, Max: {} MB (cluster limit: {} MB)", - currentSize / (1024.0 * 1024.0), - uncompressedSize / 1024.0, - effectiveLimit / (1024.0 * 1024.0), - clusterMaxRequestSize / (1024.0 * 1024.0)); - - // Flush and reset - bulkProcessor.flush(); - currentPayloadSize = 0; - } - - // Add the request (BulkProcessor is thread-safe) - if (request instanceof UpdateRequest) { - bulkProcessor.add((UpdateRequest) request); - } else if (request instanceof IndexRequest) { - bulkProcessor.add((IndexRequest) request); - } - - // Update current payload size - currentPayloadSize = currentSize + effectiveSize; - - // Warn about large documents - if (uncompressedSize > 1024 * 1024) { - LOG.warn( - "Large document detected: {} MB uncompressed{}", - uncompressedSize / (1024.0 * 1024.0), - compressionEnabled - ? String.format(", ~%.2f MB compressed", effectiveSize / (1024.0 * 1024.0)) - : ""); - } - } - - // Removed handleBackpressure() and recreateBulkProcessor() methods - // The BulkProcessor already handles backpressure internally with exponential backoff. - // Dynamic recreation of BulkProcessor was causing deadlocks in production. - private void updateStats() { stats.setTotalRecords((int) totalSubmitted.get()); stats.setSuccessRecords((int) totalSuccess.get()); @@ -367,14 +262,12 @@ public class ElasticSearchBulkSink implements BulkSink { public void close() throws IOException { try { // Flush any pending requests - if (currentBulkProcessor != null) { - currentBulkProcessor.flush(); + bulkProcessor.flush(); - // Wait for completion - boolean terminated = currentBulkProcessor.awaitClose(60, TimeUnit.SECONDS); - if (!terminated) { - LOG.warn("Bulk processor did not terminate within timeout"); - } + // Wait for completion + boolean terminated = bulkProcessor.awaitClose(60, TimeUnit.SECONDS); + if (!terminated) { + LOG.warn("Bulk processor did not terminate within timeout"); } } catch (InterruptedException e) { LOG.warn("Interrupted while closing bulk processor", e); @@ -382,18 +275,6 @@ public class ElasticSearchBulkSink implements BulkSink { } } - /** - * Get the cluster settings - */ - private Map getClusterSettings() { - try { - return searchClient.clusterSettings(); - } catch (Exception e) { - LOG.warn("Failed to fetch cluster settings: {}", e.getMessage()); - return new HashMap<>(); - } - } - /** * Get current batch size */ @@ -410,23 +291,17 @@ public class ElasticSearchBulkSink implements BulkSink { /** * Update batch size - Note: This only updates the value for future processor creation - * The current BulkProcessor continues with its original settings to avoid deadlocks */ public void updateBatchSize(int newBatchSize) { this.batchSize = newBatchSize; - LOG.info( - "Batch size updated to: {}. This will take effect when a new processor is created.", - newBatchSize); + LOG.info("Batch size updated to: {}", newBatchSize); } /** * Update concurrent requests - Note: This only updates the value for future processor creation - * The current BulkProcessor continues with its original settings to avoid deadlocks */ public void updateConcurrentRequests(int concurrentRequests) { this.maxConcurrentRequests = concurrentRequests; - LOG.info( - "Concurrent requests updated to: {}. This will take effect when a new processor is created.", - concurrentRequests); + LOG.info("Concurrent requests updated to: {}", concurrentRequests); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java index a311bd3fb4f..fcb5c60b227 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java @@ -14,7 +14,6 @@ import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.SearchIndexException; -import org.openmetadata.service.search.SearchClusterMetrics; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.search.opensearch.OpenSearchClient; @@ -41,9 +40,8 @@ public class OpenSearchBulkSink implements BulkSink { private final OpenSearchClient searchClient; private final SearchRepository searchRepository; - private volatile BulkProcessor bulkProcessor; + private final BulkProcessor bulkProcessor; private final StepStats stats = new StepStats(); - private volatile long currentPayloadSize = 0; // Track metrics private final AtomicLong totalSubmitted = new AtomicLong(0); @@ -53,10 +51,6 @@ public class OpenSearchBulkSink implements BulkSink { // Configuration private volatile int batchSize; private volatile int maxConcurrentRequests; - private final long maxPayloadSizeBytes; - private volatile BulkProcessor currentBulkProcessor; - private final long clusterMaxRequestSize; - private final boolean compressionEnabled; public OpenSearchBulkSink( SearchRepository searchRepository, @@ -68,27 +62,16 @@ public class OpenSearchBulkSink implements BulkSink { this.searchClient = (OpenSearchClient) searchRepository.getSearchClient(); this.batchSize = batchSize; this.maxConcurrentRequests = maxConcurrentRequests; - this.maxPayloadSizeBytes = maxPayloadSizeBytes; - - // Get the actual max request size and compression status from cluster settings - Map clusterSettings = getClusterSettings(); - this.clusterMaxRequestSize = SearchClusterMetrics.extractMaxContentLength(clusterSettings); - this.compressionEnabled = SearchClusterMetrics.isCompressionEnabled(clusterSettings); - - LOG.info( - "OpenSearch cluster config - Max content length: {} MB, Compression enabled: {}", - clusterMaxRequestSize / (1024 * 1024), - compressionEnabled); // Initialize stats stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0); // Create bulk processor - this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests); - this.currentBulkProcessor = this.bulkProcessor; + this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests, maxPayloadSizeBytes); } - private BulkProcessor createBulkProcessor(int bulkActions, int concurrentRequests) { + private BulkProcessor createBulkProcessor( + int bulkActions, int concurrentRequests, long maxPayloadSizeBytes) { RestHighLevelClient client = searchClient.getClient(); // Create custom request options with larger buffer for big responses @@ -97,16 +80,13 @@ public class OpenSearchBulkSink implements BulkSink { new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory( 100 * 1024 * 1024)); // 100MB buffer - // Note: The Java client doesn't automatically compress requests. - // Compression must be enabled at the HTTP client level (Apache HttpClient) - // or by using a custom request interceptor - RequestOptions requestOptions = optionsBuilder.build(); LOG.info( - "Creating BulkProcessor with batch size {} and {} concurrent requests", + "Creating BulkProcessor with batch size {}, {} concurrent requests, max payload {} MB", bulkActions, - concurrentRequests); + concurrentRequests, + maxPayloadSizeBytes / (1024 * 1024)); return BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, requestOptions, bulkListener), @@ -115,9 +95,6 @@ public class OpenSearchBulkSink implements BulkSink { public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); totalSubmitted.addAndGet(numberOfActions); - // Reset payload size counter after flush - // Reset payload size when bulk operation starts - currentPayloadSize = 0; LOG.debug( "Executing bulk request {} with {} actions", executionId, numberOfActions); } @@ -148,7 +125,7 @@ public class OpenSearchBulkSink implements BulkSink { String failureMessage = response.buildFailureMessage(); if (failureMessage.contains("rejected_execution_exception")) { LOG.warn( - "Detected backpressure from OpenSearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff."); + "Detected backpressure from OpenSearch cluster. The BulkProcessor will handle retry with exponential backoff."); } } else { totalSuccess.addAndGet(numberOfActions); @@ -175,23 +152,19 @@ public class OpenSearchBulkSink implements BulkSink { if (failure.getMessage() != null && failure.getMessage().contains("rejected_execution_exception")) { LOG.warn( - "Detected backpressure from OpenSearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff."); + "Detected backpressure from OpenSearch cluster. The BulkProcessor will handle retry with exponential backoff."); } updateStats(); } }) .setBulkActions(bulkActions) - // Ensure we never exceed OpenSearch's configured limit - .setBulkSize( - new ByteSizeValue( - Math.min(maxPayloadSizeBytes, clusterMaxRequestSize), ByteSizeUnit.BYTES)) - .setFlushInterval(TimeValue.timeValueSeconds(10)) // Increased from 5s + .setBulkSize(new ByteSizeValue(maxPayloadSizeBytes, ByteSizeUnit.BYTES)) + .setFlushInterval(TimeValue.timeValueSeconds(10)) .setConcurrentRequests(concurrentRequests) .setBackoffPolicy( BackoffPolicy.exponentialBackoff( - TimeValue.timeValueMillis(1000), - 3)) // More aggressive backoff: 1s initial, 3 retries + TimeValue.timeValueMillis(1000), 3)) // 1s initial delay, 3 retries .build(); } @@ -223,10 +196,6 @@ public class OpenSearchBulkSink implements BulkSink { addEntity(entity, indexName); } } - - // The bulk processor handles batching and flushing automatically - // We don't need to explicitly flush here - } catch (Exception e) { LOG.error("Failed to write {} entities of type {}", entities.size(), entityType, e); @@ -253,8 +222,8 @@ public class OpenSearchBulkSink implements BulkSink { updateRequest.doc(json, XContentType.JSON); updateRequest.docAsUpsert(true); - // Add to bulk processor with size checking - addRequestWithSizeCheck(updateRequest, json.getBytes().length); + // Add to bulk processor - it handles everything including size limits + bulkProcessor.add(updateRequest); } private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) @@ -265,77 +234,8 @@ public class OpenSearchBulkSink implements BulkSink { IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(json, XContentType.JSON); - // Add to bulk processor with size checking - addRequestWithSizeCheck(indexRequest, json.getBytes().length); - } - - /** - * Adds a request to the bulk processor, checking payload size and flushing if needed. - * This centralizes the logic for managing payload size limits. - */ - private void addRequestWithSizeCheck(Object request, long uncompressedSize) { - // Check if single document exceeds cluster limit - if (uncompressedSize > clusterMaxRequestSize) { - LOG.error( - "Single document exceeds OpenSearch cluster limit: {} MB > {} MB. Document will be skipped.", - uncompressedSize / (1024.0 * 1024.0), - clusterMaxRequestSize / (1024.0 * 1024.0)); - totalFailed.incrementAndGet(); - updateStats(); - return; // Skip this document - } - - // Calculate effective size based on whether compression is enabled - long effectiveSize; - if (compressionEnabled) { - // Compression typically achieves 75% reduction for JSON (25% of original size) - effectiveSize = uncompressedSize / 4; - LOG.debug( - "Using compressed size estimate: {} bytes (from {} uncompressed)", - effectiveSize, - uncompressedSize); - } else { - // Use uncompressed size when compression is disabled - effectiveSize = uncompressedSize; - } - - // Check if adding this request would exceed the payload size limit - // Using volatile - approximate size is sufficient for flushing decisions - long currentSize = currentPayloadSize; - long effectiveLimit = Math.min(maxPayloadSizeBytes, clusterMaxRequestSize); - - if (currentSize > 0 && currentSize + effectiveSize > effectiveLimit) { - LOG.info( - "Payload size limit reached. Current: {} MB, Request: {} KB, Max: {} MB (cluster limit: {} MB)", - currentSize / (1024.0 * 1024.0), - uncompressedSize / 1024.0, - effectiveLimit / (1024.0 * 1024.0), - clusterMaxRequestSize / (1024.0 * 1024.0)); - - // Flush and reset - bulkProcessor.flush(); - currentPayloadSize = 0; - } - - // Add the request (BulkProcessor is thread-safe) - if (request instanceof UpdateRequest) { - bulkProcessor.add((UpdateRequest) request); - } else if (request instanceof IndexRequest) { - bulkProcessor.add((IndexRequest) request); - } - - // Update current payload size atomically - currentPayloadSize = currentSize + effectiveSize; - - // Warn about large documents - if (uncompressedSize > 1024 * 1024) { - LOG.warn( - "Large document detected: {} MB uncompressed{}", - uncompressedSize / (1024.0 * 1024.0), - compressionEnabled - ? String.format(", ~%.2f MB compressed", effectiveSize / (1024.0 * 1024.0)) - : ""); - } + // Add to bulk processor - it handles everything including size limits + bulkProcessor.add(indexRequest); } private void updateStats() { @@ -362,14 +262,12 @@ public class OpenSearchBulkSink implements BulkSink { public void close() throws IOException { try { // Flush any pending requests - if (currentBulkProcessor != null) { - currentBulkProcessor.flush(); + bulkProcessor.flush(); - // Wait for completion - boolean terminated = currentBulkProcessor.awaitClose(60, TimeUnit.SECONDS); - if (!terminated) { - LOG.warn("Bulk processor did not terminate within timeout"); - } + // Wait for completion + boolean terminated = bulkProcessor.awaitClose(60, TimeUnit.SECONDS); + if (!terminated) { + LOG.warn("Bulk processor did not terminate within timeout"); } } catch (InterruptedException e) { LOG.warn("Interrupted while closing bulk processor", e); @@ -377,18 +275,6 @@ public class OpenSearchBulkSink implements BulkSink { } } - /** - * Get the cluster settings - */ - private Map getClusterSettings() { - try { - return searchClient.clusterSettings(); - } catch (Exception e) { - LOG.warn("Failed to fetch cluster settings: {}", e.getMessage()); - return new HashMap<>(); - } - } - /** * Get current batch size */ @@ -405,23 +291,17 @@ public class OpenSearchBulkSink implements BulkSink { /** * Update batch size - Note: This only updates the value for future processor creation - * The current BulkProcessor continues with its original settings to avoid deadlocks */ public void updateBatchSize(int newBatchSize) { this.batchSize = newBatchSize; - LOG.info( - "Batch size updated to: {}. This will take effect when a new processor is created.", - newBatchSize); + LOG.info("Batch size updated to: {}", newBatchSize); } /** * Update concurrent requests - Note: This only updates the value for future processor creation - * The current BulkProcessor continues with its original settings to avoid deadlocks */ public void updateConcurrentRequests(int concurrentRequests) { this.maxConcurrentRequests = concurrentRequests; - LOG.info( - "Concurrent requests updated to: {}. This will take effect when a new processor is created.", - concurrentRequests); + LOG.info("Concurrent requests updated to: {}", concurrentRequests); } }