Add Opensearch max payload Size (#17392)

This commit is contained in:
Mohit Yadav 2024-08-12 11:34:52 +05:30 committed by GitHub
parent 05baba9dc1
commit df9753c7bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 14 additions and 11 deletions

View File

@ -122,10 +122,18 @@ public class DataAssetsWorkflow {
this.entityEnricher = new DataInsightsEntityEnricherProcessor(totalRecords);
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new DataInsightsOpenSearchProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
this.searchIndexSink =
new OpenSearchIndexSink(
searchRepository,
totalRecords,
searchRepository.getElasticSearchConfiguration().getPayLoadSize());
} else {
this.entityProcessor = new DataInsightsElasticSearchProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
this.searchIndexSink =
new ElasticSearchIndexSink(
searchRepository,
totalRecords,
searchRepository.getElasticSearchConfiguration().getPayLoadSize());
}
}

View File

@ -195,7 +195,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
this.entityTimeSeriesProcessor = new OpenSearchEntityTimeSeriesProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
this.searchIndexSink =
new OpenSearchIndexSink(searchRepository, totalRecords, jobData.getPayLoadSize());
} else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords);

View File

@ -30,12 +30,6 @@ public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final SearchRepository searchRepository;
private final int maxPayLoadSizeInBytes;
public ElasticSearchIndexSink(SearchRepository searchRepository, int total) {
this.searchRepository = searchRepository;
this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize();
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
public ElasticSearchIndexSink(
SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
this.searchRepository = searchRepository;

View File

@ -31,9 +31,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final int maxPayLoadSizeInBytes;
public OpenSearchIndexSink(SearchRepository repository, int total) {
public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
this.searchRepository = repository;
this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize();
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}