Increase Payload Size to 100MIB for ELastic Search (#17366)

* Increase Payload Size to 100MIB

* Increase batchSize Size to 500

* Add Payload Size as option in openmetadataoperations , and searchIndexJob

* //Comment

* Add Logs
This commit is contained in:
Mohit Yadav 2024-08-11 21:58:55 +05:30 committed by GitHub
parent 83e2b68a25
commit aa5b9b2985
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 51 additions and 31 deletions

View File

@ -284,8 +284,8 @@ elasticsearch:
connectionTimeoutSecs: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
keepAliveTimeoutSecs: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
payLoadSize: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-500}
payLoadSize: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
searchIndexMappingLanguage : ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
searchIndexFactoryClassName : org.openmetadata.service.search.SearchIndexFactory

View File

@ -193,8 +193,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -403,8 +403,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -192,8 +192,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -407,8 +407,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -191,8 +191,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -403,8 +403,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -134,8 +134,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -344,8 +344,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -181,8 +181,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -391,8 +391,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -179,8 +179,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration
@ -389,8 +389,8 @@ services:
ELASTICSEARCH_CONNECTION_TIMEOUT_SECS: ${ELASTICSEARCH_CONNECTION_TIMEOUT_SECS:-5}
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-500}
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-104857600} #max payLoadSize in Bytes
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
#eventMonitoringConfiguration

View File

@ -199,7 +199,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
} else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
this.searchIndexSink =
new ElasticSearchIndexSink(searchRepository, totalRecords, jobData.getPayLoadSize());
}
}

View File

@ -36,6 +36,13 @@ public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
public ElasticSearchIndexSink(
SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
this.searchRepository = searchRepository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override
public BulkResponse write(BulkRequest data, Map<String, Object> contextData)
throws SearchIndexException {

View File

@ -268,6 +268,10 @@ public class OpenMetadataOperations implements Callable<Integer> {
names = {"-b", "--batch-size"},
defaultValue = "100")
int batchSize,
@Option(
names = {"-p", "--payload-size"},
defaultValue = "100")
int payloadSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true")
@ -283,14 +287,15 @@ public class OpenMetadataOperations implements Callable<Integer> {
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "SearchIndexingApplication";
return executeSearchReindexApp(appName, batchSize, recreateIndexes);
return executeSearchReindexApp(appName, batchSize, payloadSize, recreateIndexes);
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
}
}
private int executeSearchReindexApp(String appName, int batchSize, boolean recreateIndexes) {
private int executeSearchReindexApp(
String appName, int batchSize, int payloadSize, boolean recreateIndexes) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalSearchIndexApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));
@ -302,10 +307,11 @@ public class OpenMetadataOperations implements Callable<Integer> {
EventPublisherJob updatedJob = JsonUtils.deepCopy(storedJob, EventPublisherJob.class);
updatedJob
.withBatchSize(batchSize)
.withPayLoadSize(payloadSize)
.withRecreateIndex(recreateIndexes)
.withEntities(Set.of("all"));
// Update the search index app with the new batch size and recreate index flag
// Update the search index app with the new batch size, payload size and recreate index flag
App updatedSearchIndexApp = JsonUtils.deepCopy(originalSearchIndexApp, App.class);
updatedSearchIndexApp.withAppConfiguration(updatedJob);
JsonPatch patch = JsonUtils.getJsonPatch(originalSearchIndexApp, updatedSearchIndexApp);
@ -371,7 +377,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
} catch (Exception ignored) {
}
LOG.info(
"Reindexing Status not available yet, waiting for 10 seconds to fetch the status again.");
"[Reindexing] Current Available Status : {}. Reindexing is still, waiting for 10 seconds to fetch the latest status again.",
JsonUtils.pojoToJson(appRunRecord));
Thread.sleep(10000);
} while (!isRunCompleted(appRunRecord));

View File

@ -101,6 +101,11 @@
"description": "Maximum number of events sent in a batch (Default 10).",
"type": "integer"
},
"payLoadSize": {
"description": "Payload size in bytes depending on config",
"type": "integer",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"