remove unnecessary code to check compression, batch sizes etc..

This commit is contained in:
Sriharsha Chintalapani 2025-06-26 18:29:39 -07:00
parent 7a2f749f17
commit 9f17d1309b
2 changed files with 46 additions and 291 deletions

View File

@ -16,7 +16,6 @@ import es.org.elasticsearch.common.unit.ByteSizeValue;
import es.org.elasticsearch.core.TimeValue;
import es.org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -28,7 +27,6 @@ import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
import org.openmetadata.service.search.models.IndexMapping;
@ -42,9 +40,8 @@ public class ElasticSearchBulkSink implements BulkSink {
private final ElasticSearchClient searchClient;
private final SearchRepository searchRepository;
private volatile BulkProcessor bulkProcessor;
private final BulkProcessor bulkProcessor;
private final StepStats stats = new StepStats();
private volatile long currentPayloadSize = 0;
// Track metrics
private final AtomicLong totalSubmitted = new AtomicLong(0);
@ -54,10 +51,6 @@ public class ElasticSearchBulkSink implements BulkSink {
// Configuration
private volatile int batchSize;
private volatile int maxConcurrentRequests;
private final long maxPayloadSizeBytes;
private volatile BulkProcessor currentBulkProcessor;
private final long clusterMaxRequestSize;
private final boolean compressionEnabled;
public ElasticSearchBulkSink(
SearchRepository searchRepository,
@ -69,27 +62,16 @@ public class ElasticSearchBulkSink implements BulkSink {
this.searchClient = (ElasticSearchClient) searchRepository.getSearchClient();
this.batchSize = batchSize;
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxPayloadSizeBytes = maxPayloadSizeBytes;
// Get the actual max request size and compression status from cluster settings
Map<String, Object> clusterSettings = getClusterSettings();
this.clusterMaxRequestSize = SearchClusterMetrics.extractMaxContentLength(clusterSettings);
this.compressionEnabled = SearchClusterMetrics.isCompressionEnabled(clusterSettings);
LOG.info(
"Elasticsearch cluster config - Max content length: {} MB, Compression enabled: {}",
clusterMaxRequestSize / (1024 * 1024),
compressionEnabled);
// Initialize stats
stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0);
// Create bulk processor
this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests);
this.currentBulkProcessor = this.bulkProcessor;
this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests, maxPayloadSizeBytes);
}
private BulkProcessor createBulkProcessor(int bulkActions, int concurrentRequests) {
private BulkProcessor createBulkProcessor(
int bulkActions, int concurrentRequests, long maxPayloadSizeBytes) {
RestHighLevelClient client = (RestHighLevelClient) searchClient.getClient();
// Create custom request options with larger buffer for big responses
@ -98,16 +80,13 @@ public class ElasticSearchBulkSink implements BulkSink {
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(
100 * 1024 * 1024)); // 100MB buffer
// Note: The Java client doesn't automatically compress requests.
// Compression must be enabled at the HTTP client level (Apache HttpClient)
// or by using a custom request interceptor
RequestOptions requestOptions = optionsBuilder.build();
LOG.info(
"Creating BulkProcessor with batch size {} and {} concurrent requests",
"Creating BulkProcessor with batch size {}, {} concurrent requests, max payload {} MB",
bulkActions,
concurrentRequests);
concurrentRequests,
maxPayloadSizeBytes / (1024 * 1024));
return BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, requestOptions, bulkListener),
@ -116,9 +95,6 @@ public class ElasticSearchBulkSink implements BulkSink {
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
totalSubmitted.addAndGet(numberOfActions);
// Reset payload size counter after flush
// Reset payload size when bulk operation starts
currentPayloadSize = 0;
LOG.debug(
"Executing bulk request {} with {} actions", executionId, numberOfActions);
}
@ -149,7 +125,7 @@ public class ElasticSearchBulkSink implements BulkSink {
String failureMessage = response.buildFailureMessage();
if (failureMessage.contains("rejected_execution_exception")) {
LOG.warn(
"Detected backpressure from Elasticsearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff.");
"Detected backpressure from Elasticsearch cluster. The BulkProcessor will handle retry with exponential backoff.");
}
} else {
totalSuccess.addAndGet(numberOfActions);
@ -176,23 +152,19 @@ public class ElasticSearchBulkSink implements BulkSink {
if (failure.getMessage() != null
&& failure.getMessage().contains("rejected_execution_exception")) {
LOG.warn(
"Detected backpressure from Elasticsearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff.");
"Detected backpressure from Elasticsearch cluster. The BulkProcessor will handle retry with exponential backoff.");
}
updateStats();
}
})
.setBulkActions(bulkActions)
// Ensure we never exceed Elasticsearch's configured limit
.setBulkSize(
new ByteSizeValue(
Math.min(maxPayloadSizeBytes, clusterMaxRequestSize), ByteSizeUnit.BYTES))
.setFlushInterval(TimeValue.timeValueSeconds(10)) // Increased from 5s
.setBulkSize(new ByteSizeValue(maxPayloadSizeBytes, ByteSizeUnit.BYTES))
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(1000),
3)) // More aggressive backoff: 1s initial, 3 retries
TimeValue.timeValueMillis(1000), 3)) // 1s initial delay, 3 retries
.build();
}
@ -224,10 +196,6 @@ public class ElasticSearchBulkSink implements BulkSink {
addEntity(entity, indexName);
}
}
// The bulk processor handles batching and flushing automatically
// We don't need to explicitly flush here
} catch (Exception e) {
LOG.error("Failed to write {} entities of type {}", entities.size(), entityType, e);
@ -254,8 +222,8 @@ public class ElasticSearchBulkSink implements BulkSink {
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
// Add to bulk processor with size checking
addRequestWithSizeCheck(updateRequest, json.getBytes().length);
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(updateRequest);
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName)
@ -266,83 +234,10 @@ public class ElasticSearchBulkSink implements BulkSink {
IndexRequest indexRequest =
new IndexRequest(indexName).id(docId).source(json, XContentType.JSON);
// Add to bulk processor with size checking
addRequestWithSizeCheck(indexRequest, json.getBytes().length);
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(indexRequest);
}
/**
* Adds a request to the bulk processor, checking payload size and flushing if needed.
* This centralizes the logic for managing payload size limits.
*/
private void addRequestWithSizeCheck(Object request, long uncompressedSize) {
// Check if single document exceeds cluster limit
if (uncompressedSize > clusterMaxRequestSize) {
LOG.error(
"Single document exceeds Elasticsearch cluster limit: {} MB > {} MB. Document will be skipped.",
uncompressedSize / (1024.0 * 1024.0),
clusterMaxRequestSize / (1024.0 * 1024.0));
totalFailed.incrementAndGet();
updateStats();
return; // Skip this document
}
// Calculate effective size based on whether compression is enabled
long effectiveSize;
if (compressionEnabled) {
// Compression typically achieves 75% reduction for JSON (25% of original size)
effectiveSize = uncompressedSize / 4;
LOG.debug(
"Using compressed size estimate: {} bytes (from {} uncompressed)",
effectiveSize,
uncompressedSize);
} else {
// Use uncompressed size when compression is disabled
effectiveSize = uncompressedSize;
}
// Check if adding this request would exceed the payload size limit
// Using volatile read/write for currentPayloadSize - no locking needed
long currentSize = currentPayloadSize;
long effectiveLimit = Math.min(maxPayloadSizeBytes, clusterMaxRequestSize);
if (currentSize > 0 && currentSize + effectiveSize > effectiveLimit) {
LOG.info(
"Payload size limit reached. Current: {} MB, Request: {} KB, Max: {} MB (cluster limit: {} MB)",
currentSize / (1024.0 * 1024.0),
uncompressedSize / 1024.0,
effectiveLimit / (1024.0 * 1024.0),
clusterMaxRequestSize / (1024.0 * 1024.0));
// Flush and reset
bulkProcessor.flush();
currentPayloadSize = 0;
}
// Add the request (BulkProcessor is thread-safe)
if (request instanceof UpdateRequest) {
bulkProcessor.add((UpdateRequest) request);
} else if (request instanceof IndexRequest) {
bulkProcessor.add((IndexRequest) request);
}
// Update current payload size
currentPayloadSize = currentSize + effectiveSize;
// Warn about large documents
if (uncompressedSize > 1024 * 1024) {
LOG.warn(
"Large document detected: {} MB uncompressed{}",
uncompressedSize / (1024.0 * 1024.0),
compressionEnabled
? String.format(", ~%.2f MB compressed", effectiveSize / (1024.0 * 1024.0))
: "");
}
}
// Removed handleBackpressure() and recreateBulkProcessor() methods
// The BulkProcessor already handles backpressure internally with exponential backoff.
// Dynamic recreation of BulkProcessor was causing deadlocks in production.
private void updateStats() {
stats.setTotalRecords((int) totalSubmitted.get());
stats.setSuccessRecords((int) totalSuccess.get());
@ -367,14 +262,12 @@ public class ElasticSearchBulkSink implements BulkSink {
public void close() throws IOException {
try {
// Flush any pending requests
if (currentBulkProcessor != null) {
currentBulkProcessor.flush();
bulkProcessor.flush();
// Wait for completion
boolean terminated = currentBulkProcessor.awaitClose(60, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Bulk processor did not terminate within timeout");
}
// Wait for completion
boolean terminated = bulkProcessor.awaitClose(60, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Bulk processor did not terminate within timeout");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while closing bulk processor", e);
@ -382,18 +275,6 @@ public class ElasticSearchBulkSink implements BulkSink {
}
}
/**
* Get the cluster settings
*/
private Map<String, Object> getClusterSettings() {
try {
return searchClient.clusterSettings();
} catch (Exception e) {
LOG.warn("Failed to fetch cluster settings: {}", e.getMessage());
return new HashMap<>();
}
}
/**
* Get current batch size
*/
@ -410,23 +291,17 @@ public class ElasticSearchBulkSink implements BulkSink {
/**
* Update batch size - Note: This only updates the value for future processor creation
* The current BulkProcessor continues with its original settings to avoid deadlocks
*/
public void updateBatchSize(int newBatchSize) {
this.batchSize = newBatchSize;
LOG.info(
"Batch size updated to: {}. This will take effect when a new processor is created.",
newBatchSize);
LOG.info("Batch size updated to: {}", newBatchSize);
}
/**
* Update concurrent requests - Note: This only updates the value for future processor creation
* The current BulkProcessor continues with its original settings to avoid deadlocks
*/
public void updateConcurrentRequests(int concurrentRequests) {
this.maxConcurrentRequests = concurrentRequests;
LOG.info(
"Concurrent requests updated to: {}. This will take effect when a new processor is created.",
concurrentRequests);
LOG.info("Concurrent requests updated to: {}", concurrentRequests);
}
}

View File

@ -14,7 +14,6 @@ import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.opensearch.OpenSearchClient;
@ -41,9 +40,8 @@ public class OpenSearchBulkSink implements BulkSink {
private final OpenSearchClient searchClient;
private final SearchRepository searchRepository;
private volatile BulkProcessor bulkProcessor;
private final BulkProcessor bulkProcessor;
private final StepStats stats = new StepStats();
private volatile long currentPayloadSize = 0;
// Track metrics
private final AtomicLong totalSubmitted = new AtomicLong(0);
@ -53,10 +51,6 @@ public class OpenSearchBulkSink implements BulkSink {
// Configuration
private volatile int batchSize;
private volatile int maxConcurrentRequests;
private final long maxPayloadSizeBytes;
private volatile BulkProcessor currentBulkProcessor;
private final long clusterMaxRequestSize;
private final boolean compressionEnabled;
public OpenSearchBulkSink(
SearchRepository searchRepository,
@ -68,27 +62,16 @@ public class OpenSearchBulkSink implements BulkSink {
this.searchClient = (OpenSearchClient) searchRepository.getSearchClient();
this.batchSize = batchSize;
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxPayloadSizeBytes = maxPayloadSizeBytes;
// Get the actual max request size and compression status from cluster settings
Map<String, Object> clusterSettings = getClusterSettings();
this.clusterMaxRequestSize = SearchClusterMetrics.extractMaxContentLength(clusterSettings);
this.compressionEnabled = SearchClusterMetrics.isCompressionEnabled(clusterSettings);
LOG.info(
"OpenSearch cluster config - Max content length: {} MB, Compression enabled: {}",
clusterMaxRequestSize / (1024 * 1024),
compressionEnabled);
// Initialize stats
stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0);
// Create bulk processor
this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests);
this.currentBulkProcessor = this.bulkProcessor;
this.bulkProcessor = createBulkProcessor(batchSize, maxConcurrentRequests, maxPayloadSizeBytes);
}
private BulkProcessor createBulkProcessor(int bulkActions, int concurrentRequests) {
private BulkProcessor createBulkProcessor(
int bulkActions, int concurrentRequests, long maxPayloadSizeBytes) {
RestHighLevelClient client = searchClient.getClient();
// Create custom request options with larger buffer for big responses
@ -97,16 +80,13 @@ public class OpenSearchBulkSink implements BulkSink {
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(
100 * 1024 * 1024)); // 100MB buffer
// Note: The Java client doesn't automatically compress requests.
// Compression must be enabled at the HTTP client level (Apache HttpClient)
// or by using a custom request interceptor
RequestOptions requestOptions = optionsBuilder.build();
LOG.info(
"Creating BulkProcessor with batch size {} and {} concurrent requests",
"Creating BulkProcessor with batch size {}, {} concurrent requests, max payload {} MB",
bulkActions,
concurrentRequests);
concurrentRequests,
maxPayloadSizeBytes / (1024 * 1024));
return BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, requestOptions, bulkListener),
@ -115,9 +95,6 @@ public class OpenSearchBulkSink implements BulkSink {
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
totalSubmitted.addAndGet(numberOfActions);
// Reset payload size counter after flush
// Reset payload size when bulk operation starts
currentPayloadSize = 0;
LOG.debug(
"Executing bulk request {} with {} actions", executionId, numberOfActions);
}
@ -148,7 +125,7 @@ public class OpenSearchBulkSink implements BulkSink {
String failureMessage = response.buildFailureMessage();
if (failureMessage.contains("rejected_execution_exception")) {
LOG.warn(
"Detected backpressure from OpenSearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff.");
"Detected backpressure from OpenSearch cluster. The BulkProcessor will handle retry with exponential backoff.");
}
} else {
totalSuccess.addAndGet(numberOfActions);
@ -175,23 +152,19 @@ public class OpenSearchBulkSink implements BulkSink {
if (failure.getMessage() != null
&& failure.getMessage().contains("rejected_execution_exception")) {
LOG.warn(
"Detected backpressure from OpenSearch cluster (rejected_execution_exception). The BulkProcessor will handle retry with exponential backoff.");
"Detected backpressure from OpenSearch cluster. The BulkProcessor will handle retry with exponential backoff.");
}
updateStats();
}
})
.setBulkActions(bulkActions)
// Ensure we never exceed OpenSearch's configured limit
.setBulkSize(
new ByteSizeValue(
Math.min(maxPayloadSizeBytes, clusterMaxRequestSize), ByteSizeUnit.BYTES))
.setFlushInterval(TimeValue.timeValueSeconds(10)) // Increased from 5s
.setBulkSize(new ByteSizeValue(maxPayloadSizeBytes, ByteSizeUnit.BYTES))
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(1000),
3)) // More aggressive backoff: 1s initial, 3 retries
TimeValue.timeValueMillis(1000), 3)) // 1s initial delay, 3 retries
.build();
}
@ -223,10 +196,6 @@ public class OpenSearchBulkSink implements BulkSink {
addEntity(entity, indexName);
}
}
// The bulk processor handles batching and flushing automatically
// We don't need to explicitly flush here
} catch (Exception e) {
LOG.error("Failed to write {} entities of type {}", entities.size(), entityType, e);
@ -253,8 +222,8 @@ public class OpenSearchBulkSink implements BulkSink {
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
// Add to bulk processor with size checking
addRequestWithSizeCheck(updateRequest, json.getBytes().length);
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(updateRequest);
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName)
@ -265,77 +234,8 @@ public class OpenSearchBulkSink implements BulkSink {
IndexRequest indexRequest =
new IndexRequest(indexName).id(docId).source(json, XContentType.JSON);
// Add to bulk processor with size checking
addRequestWithSizeCheck(indexRequest, json.getBytes().length);
}
/**
* Adds a request to the bulk processor, checking payload size and flushing if needed.
* This centralizes the logic for managing payload size limits.
*/
private void addRequestWithSizeCheck(Object request, long uncompressedSize) {
// Check if single document exceeds cluster limit
if (uncompressedSize > clusterMaxRequestSize) {
LOG.error(
"Single document exceeds OpenSearch cluster limit: {} MB > {} MB. Document will be skipped.",
uncompressedSize / (1024.0 * 1024.0),
clusterMaxRequestSize / (1024.0 * 1024.0));
totalFailed.incrementAndGet();
updateStats();
return; // Skip this document
}
// Calculate effective size based on whether compression is enabled
long effectiveSize;
if (compressionEnabled) {
// Compression typically achieves 75% reduction for JSON (25% of original size)
effectiveSize = uncompressedSize / 4;
LOG.debug(
"Using compressed size estimate: {} bytes (from {} uncompressed)",
effectiveSize,
uncompressedSize);
} else {
// Use uncompressed size when compression is disabled
effectiveSize = uncompressedSize;
}
// Check if adding this request would exceed the payload size limit
// Using volatile - approximate size is sufficient for flushing decisions
long currentSize = currentPayloadSize;
long effectiveLimit = Math.min(maxPayloadSizeBytes, clusterMaxRequestSize);
if (currentSize > 0 && currentSize + effectiveSize > effectiveLimit) {
LOG.info(
"Payload size limit reached. Current: {} MB, Request: {} KB, Max: {} MB (cluster limit: {} MB)",
currentSize / (1024.0 * 1024.0),
uncompressedSize / 1024.0,
effectiveLimit / (1024.0 * 1024.0),
clusterMaxRequestSize / (1024.0 * 1024.0));
// Flush and reset
bulkProcessor.flush();
currentPayloadSize = 0;
}
// Add the request (BulkProcessor is thread-safe)
if (request instanceof UpdateRequest) {
bulkProcessor.add((UpdateRequest) request);
} else if (request instanceof IndexRequest) {
bulkProcessor.add((IndexRequest) request);
}
// Update current payload size atomically
currentPayloadSize = currentSize + effectiveSize;
// Warn about large documents
if (uncompressedSize > 1024 * 1024) {
LOG.warn(
"Large document detected: {} MB uncompressed{}",
uncompressedSize / (1024.0 * 1024.0),
compressionEnabled
? String.format(", ~%.2f MB compressed", effectiveSize / (1024.0 * 1024.0))
: "");
}
// Add to bulk processor - it handles everything including size limits
bulkProcessor.add(indexRequest);
}
private void updateStats() {
@ -362,14 +262,12 @@ public class OpenSearchBulkSink implements BulkSink {
public void close() throws IOException {
try {
// Flush any pending requests
if (currentBulkProcessor != null) {
currentBulkProcessor.flush();
bulkProcessor.flush();
// Wait for completion
boolean terminated = currentBulkProcessor.awaitClose(60, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Bulk processor did not terminate within timeout");
}
// Wait for completion
boolean terminated = bulkProcessor.awaitClose(60, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Bulk processor did not terminate within timeout");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while closing bulk processor", e);
@ -377,18 +275,6 @@ public class OpenSearchBulkSink implements BulkSink {
}
}
/**
* Get the cluster settings
*/
private Map<String, Object> getClusterSettings() {
try {
return searchClient.clusterSettings();
} catch (Exception e) {
LOG.warn("Failed to fetch cluster settings: {}", e.getMessage());
return new HashMap<>();
}
}
/**
* Get current batch size
*/
@ -405,23 +291,17 @@ public class OpenSearchBulkSink implements BulkSink {
/**
* Update batch size - Note: This only updates the value for future processor creation
* The current BulkProcessor continues with its original settings to avoid deadlocks
*/
public void updateBatchSize(int newBatchSize) {
this.batchSize = newBatchSize;
LOG.info(
"Batch size updated to: {}. This will take effect when a new processor is created.",
newBatchSize);
LOG.info("Batch size updated to: {}", newBatchSize);
}
/**
* Update concurrent requests - Note: This only updates the value for future processor creation
* The current BulkProcessor continues with its original settings to avoid deadlocks
*/
public void updateConcurrentRequests(int concurrentRequests) {
this.maxConcurrentRequests = concurrentRequests;
LOG.info(
"Concurrent requests updated to: {}. This will take effect when a new processor is created.",
concurrentRequests);
LOG.info("Concurrent requests updated to: {}", concurrentRequests);
}
}