mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-12 15:57:44 +00:00
Added payload size check logic for Elasticsearch (#15540)
* Added payload size check logic for Elasticsearch * Added payload size check logic for Elasticsearch - made review changes * Added payload size check logic for Elasticsearch - added env in docker files * Optimize payload logic, fix search application error stats issue
This commit is contained in:
parent
269ed6d5bb
commit
c010939072
@ -273,6 +273,7 @@ elasticsearch:
|
||||
socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
keepAliveTimeoutSecs: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
payLoadSize: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
searchIndexMappingLanguage : ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
searchIndexFactoryClassName : org.openmetadata.service.search.SearchIndexFactory
|
||||
|
||||
|
||||
@ -191,6 +191,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
@ -400,6 +401,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
|
||||
@ -190,6 +190,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
@ -396,6 +397,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
|
||||
@ -134,6 +134,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
@ -339,6 +340,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
|
||||
@ -97,6 +97,7 @@ ELASTICSEARCH_CONNECTION_TIMEOUT_SECS="5"
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS="60"
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS="600"
|
||||
ELASTICSEARCH_BATCH_SIZE="10"
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE="10485760"
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG="EN"
|
||||
# Event Monitoring configurations
|
||||
EVENT_MONITOR="prometheus"
|
||||
|
||||
@ -97,6 +97,7 @@ ELASTICSEARCH_CONNECTION_TIMEOUT_SECS="5"
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS="60"
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS="600"
|
||||
ELASTICSEARCH_BATCH_SIZE="10"
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE="10485760"
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG="EN"
|
||||
# Event Monitoring configurations
|
||||
EVENT_MONITOR="prometheus"
|
||||
|
||||
@ -181,6 +181,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
@ -386,6 +387,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
|
||||
@ -179,6 +179,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
@ -384,6 +385,7 @@ services:
|
||||
ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||
ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600}
|
||||
ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||
ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes
|
||||
ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN}
|
||||
|
||||
#eventMonitoringConfiguration
|
||||
|
||||
@ -3,6 +3,7 @@ package org.openmetadata.service.apps.bundles.searchIndex;
|
||||
import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
|
||||
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
|
||||
@ -12,6 +13,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
@ -220,6 +222,17 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
while (!stopped && !paginatedEntitiesSource.isDone()) {
|
||||
try {
|
||||
resultList = paginatedEntitiesSource.readNext(null);
|
||||
List<String> entityName =
|
||||
resultList.getData().stream()
|
||||
.map(
|
||||
entity ->
|
||||
String.format(
|
||||
"%s %s",
|
||||
paginatedEntitiesSource.getEntityType(),
|
||||
entity.getFullyQualifiedName()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
|
||||
if (!resultList.getData().isEmpty()) {
|
||||
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
|
||||
if (!resultList.getErrors().isEmpty()) {
|
||||
@ -234,10 +247,13 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
|
||||
.withFailedEntities(resultList.getErrors()));
|
||||
}
|
||||
paginatedEntitiesSource.updateStats(resultList.getData().size(), 0);
|
||||
}
|
||||
} catch (SearchIndexException rx) {
|
||||
jobData.setStatus(EventPublisherJob.Status.FAILED);
|
||||
jobData.setFailure(rx.getIndexingError());
|
||||
paginatedEntitiesSource.updateStats(
|
||||
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
|
||||
} finally {
|
||||
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
|
||||
sendUpdates(jobExecutionContext);
|
||||
@ -255,13 +271,25 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
while (!stopped && !paginatedDataInsightSource.isDone()) {
|
||||
try {
|
||||
resultList = paginatedDataInsightSource.readNext(null);
|
||||
List<String> entityName =
|
||||
resultList.getData().stream()
|
||||
.map(
|
||||
entity ->
|
||||
String.format(
|
||||
"%s %s", paginatedDataInsightSource.getEntityType(), entity.getId()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
|
||||
if (!resultList.getData().isEmpty()) {
|
||||
searchIndexSink.write(
|
||||
dataInsightProcessor.process(resultList, contextData), contextData);
|
||||
}
|
||||
paginatedDataInsightSource.updateStats(resultList.getData().size(), 0);
|
||||
} catch (SearchIndexException ex) {
|
||||
jobData.setStatus(EventPublisherJob.Status.FAILED);
|
||||
jobData.setFailure(ex.getIndexingError());
|
||||
paginatedDataInsightSource.updateStats(
|
||||
ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
|
||||
} finally {
|
||||
updateStats(
|
||||
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
|
||||
|
||||
@ -1,16 +1,22 @@
|
||||
package org.openmetadata.service.search.elasticsearch;
|
||||
|
||||
import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponseEs;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
|
||||
|
||||
import es.org.elasticsearch.action.DocWriteRequest;
|
||||
import es.org.elasticsearch.action.bulk.BulkRequest;
|
||||
import es.org.elasticsearch.action.bulk.BulkResponse;
|
||||
import es.org.elasticsearch.client.RequestOptions;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.glassfish.jersey.internal.util.ExceptionUtils;
|
||||
import org.openmetadata.schema.system.EntityError;
|
||||
import org.openmetadata.schema.system.IndexingError;
|
||||
import org.openmetadata.schema.system.StepStats;
|
||||
import org.openmetadata.service.exception.SearchIndexException;
|
||||
@ -22,9 +28,11 @@ import org.openmetadata.service.workflows.interfaces.Sink;
|
||||
public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
|
||||
private final StepStats stats = new StepStats();
|
||||
private final SearchRepository searchRepository;
|
||||
private final int maxPayLoadSizeInBytes;
|
||||
|
||||
public ElasticSearchIndexSink(SearchRepository searchRepository, int total) {
|
||||
this.searchRepository = searchRepository;
|
||||
this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize();
|
||||
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
|
||||
}
|
||||
|
||||
@ -33,30 +41,64 @@ public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
|
||||
throws SearchIndexException {
|
||||
LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions());
|
||||
try {
|
||||
BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT);
|
||||
int currentSuccess = getSuccessFromBulkResponseEs(response);
|
||||
int currentFailed = response.getItems().length - currentSuccess;
|
||||
List<?> entityNames =
|
||||
(List<?>)
|
||||
Optional.ofNullable(contextData.get(ENTITY_NAME_LIST_KEY))
|
||||
.orElse(Collections.emptyList());
|
||||
List<EntityError> entityErrorList = new ArrayList<>();
|
||||
BulkResponse response = null;
|
||||
|
||||
if (currentFailed != 0) {
|
||||
BulkRequest bufferData = new BulkRequest();
|
||||
long requestIndex = 0; // Index to track the corresponding entity name
|
||||
|
||||
for (DocWriteRequest<?> request : data.requests()) {
|
||||
long requestSize = new BulkRequest().add(request).estimatedSizeInBytes();
|
||||
|
||||
if (requestSize > maxPayLoadSizeInBytes) {
|
||||
entityErrorList.add(
|
||||
new EntityError()
|
||||
.withMessage("Entity size exceeds elastic search maximum payload size")
|
||||
.withEntity(entityNames.get(Math.toIntExact(requestIndex))));
|
||||
requestIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (bufferData.estimatedSizeInBytes() + requestSize > maxPayLoadSizeInBytes) {
|
||||
response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
|
||||
entityErrorList.addAll(getErrorsFromBulkResponse(response));
|
||||
bufferData = new BulkRequest();
|
||||
}
|
||||
|
||||
bufferData.add(request);
|
||||
requestIndex++;
|
||||
}
|
||||
|
||||
// Send the last buffer if it has any requests
|
||||
if (!bufferData.requests().isEmpty()) {
|
||||
response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
|
||||
entityErrorList.addAll(getErrorsFromBulkResponse(response));
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
|
||||
data.numberOfActions(),
|
||||
data.numberOfActions() - entityErrorList.size(),
|
||||
entityErrorList.size());
|
||||
updateStats(data.numberOfActions() - entityErrorList.size(), entityErrorList.size());
|
||||
|
||||
// Handle errors
|
||||
if (!entityErrorList.isEmpty()) {
|
||||
throw new SearchIndexException(
|
||||
new IndexingError()
|
||||
.withErrorSource(SINK)
|
||||
.withSubmittedCount(data.numberOfActions())
|
||||
.withSuccessCount(currentSuccess)
|
||||
.withFailedCount(currentFailed)
|
||||
.withSuccessCount(data.numberOfActions() - entityErrorList.size())
|
||||
.withFailedCount(entityErrorList.size())
|
||||
.withMessage("Issues in Sink To Elastic Search.")
|
||||
.withFailedEntities(getErrorsFromBulkResponse(response)));
|
||||
.withFailedEntities(entityErrorList));
|
||||
}
|
||||
|
||||
// Update Stats
|
||||
LOG.debug(
|
||||
"[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
|
||||
data.numberOfActions(),
|
||||
currentSuccess,
|
||||
currentFailed);
|
||||
updateStats(currentSuccess, currentFailed);
|
||||
|
||||
return response;
|
||||
return response; // Return the last response
|
||||
} catch (SearchIndexException ex) {
|
||||
updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
|
||||
throw ex;
|
||||
|
||||
@ -1,19 +1,25 @@
|
||||
package org.openmetadata.service.search.opensearch;
|
||||
|
||||
import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.glassfish.jersey.internal.util.ExceptionUtils;
|
||||
import org.openmetadata.schema.system.EntityError;
|
||||
import org.openmetadata.schema.system.IndexingError;
|
||||
import org.openmetadata.schema.system.StepStats;
|
||||
import org.openmetadata.service.exception.SearchIndexException;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.workflows.interfaces.Sink;
|
||||
import os.org.opensearch.action.DocWriteRequest;
|
||||
import os.org.opensearch.action.bulk.BulkRequest;
|
||||
import os.org.opensearch.action.bulk.BulkResponse;
|
||||
import os.org.opensearch.client.RequestOptions;
|
||||
@ -23,39 +29,77 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
|
||||
private final StepStats stats = new StepStats();
|
||||
private final SearchRepository searchRepository;
|
||||
|
||||
private final int maxPayLoadSizeInBytes;
|
||||
|
||||
public OpenSearchIndexSink(SearchRepository repository, int total) {
|
||||
this.searchRepository = repository;
|
||||
this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize();
|
||||
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkResponse write(BulkRequest data, Map<String, Object> contextData)
|
||||
throws SearchIndexException {
|
||||
LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions());
|
||||
LOG.debug("[OsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions());
|
||||
try {
|
||||
BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT);
|
||||
int currentSuccess = getSuccessFromBulkResponse(response);
|
||||
int currentFailed = response.getItems().length - currentSuccess;
|
||||
List<?> entityNames =
|
||||
(List<?>)
|
||||
Optional.ofNullable(contextData.get(ENTITY_NAME_LIST_KEY))
|
||||
.orElse(Collections.emptyList());
|
||||
List<EntityError> entityErrorList = new ArrayList<>();
|
||||
BulkResponse response = null;
|
||||
|
||||
if (currentFailed != 0) {
|
||||
BulkRequest bufferData = new BulkRequest();
|
||||
long requestIndex = 0; // Index to track the corresponding entity name
|
||||
|
||||
for (DocWriteRequest<?> request : data.requests()) {
|
||||
long requestSize = new BulkRequest().add(request).estimatedSizeInBytes();
|
||||
|
||||
if (requestSize > maxPayLoadSizeInBytes) {
|
||||
entityErrorList.add(
|
||||
new EntityError()
|
||||
.withMessage("Entity size exceeds elastic search maximum payload size")
|
||||
.withEntity(entityNames.get(Math.toIntExact(requestIndex))));
|
||||
requestIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (bufferData.estimatedSizeInBytes() + requestSize > maxPayLoadSizeInBytes) {
|
||||
response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
|
||||
entityErrorList.addAll(getErrorsFromBulkResponse(response));
|
||||
bufferData = new BulkRequest();
|
||||
}
|
||||
|
||||
bufferData.add(request);
|
||||
requestIndex++;
|
||||
}
|
||||
|
||||
// Send the last buffer if it has any requests
|
||||
if (!bufferData.requests().isEmpty()) {
|
||||
response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
|
||||
entityErrorList.addAll(getErrorsFromBulkResponse(response));
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"[OSSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
|
||||
data.numberOfActions(),
|
||||
data.numberOfActions() - entityErrorList.size(),
|
||||
entityErrorList.size());
|
||||
updateStats(data.numberOfActions() - entityErrorList.size(), entityErrorList.size());
|
||||
|
||||
// Handle errors
|
||||
if (!entityErrorList.isEmpty()) {
|
||||
throw new SearchIndexException(
|
||||
new IndexingError()
|
||||
.withErrorSource(SINK)
|
||||
.withSubmittedCount(data.numberOfActions())
|
||||
.withSuccessCount(currentSuccess)
|
||||
.withFailedCount(currentFailed)
|
||||
.withSuccessCount(data.numberOfActions() - entityErrorList.size())
|
||||
.withFailedCount(entityErrorList.size())
|
||||
.withMessage("Issues in Sink To Elastic Search.")
|
||||
.withFailedEntities(getErrorsFromBulkResponse(response)));
|
||||
.withFailedEntities(entityErrorList));
|
||||
}
|
||||
// Update Stats
|
||||
LOG.debug(
|
||||
"[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}",
|
||||
data.numberOfActions(),
|
||||
currentSuccess,
|
||||
currentFailed);
|
||||
updateStats(currentSuccess, currentFailed);
|
||||
|
||||
return response;
|
||||
return response; // Return the last response
|
||||
} catch (SearchIndexException ex) {
|
||||
updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
|
||||
throw ex;
|
||||
@ -68,7 +112,7 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
|
||||
.withFailedCount(data.numberOfActions())
|
||||
.withMessage("Issue in Sink to Elastic Search.")
|
||||
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
|
||||
LOG.debug("[OsSearchIndexSink] Failed, Details : {}", JsonUtils.pojoToJson(indexingError));
|
||||
LOG.debug("[OSSearchIndexSink] Failed, Details : {}", JsonUtils.pojoToJson(indexingError));
|
||||
updateStats(0, data.numberOfActions());
|
||||
throw new SearchIndexException(indexingError);
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
|
||||
batchSize,
|
||||
result.getData().size(),
|
||||
0);
|
||||
updateStats(result.getData().size(), result.getErrors().size());
|
||||
// updateStats(result.getData().size(), result.getErrors().size());
|
||||
} catch (Exception ex) {
|
||||
IndexingError indexingError =
|
||||
new IndexingError()
|
||||
|
||||
@ -88,14 +88,14 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
|
||||
} else {
|
||||
this.cursor = result.getPaging().getAfter();
|
||||
}
|
||||
updateStats(result.getData().size(), result.getErrors().size());
|
||||
// updateStats(result.getData().size(), result.getErrors().size());
|
||||
return result;
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
|
||||
batchSize, result.getData().size(), result.getErrors().size());
|
||||
updateStats(result.getData().size(), result.getErrors().size());
|
||||
// updateStats(result.getData().size(), result.getErrors().size());
|
||||
} catch (Exception e) {
|
||||
lastFailedCursor = this.cursor;
|
||||
int remainingRecords =
|
||||
|
||||
@ -31,6 +31,7 @@ public class ReindexingUtil {
|
||||
}
|
||||
|
||||
public static final String ENTITY_TYPE_KEY = "entityType";
|
||||
public static final String ENTITY_NAME_LIST_KEY = "entityNameList";
|
||||
|
||||
public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) {
|
||||
stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess);
|
||||
|
||||
@ -66,6 +66,11 @@
|
||||
"type": "integer",
|
||||
"default": 10
|
||||
},
|
||||
"payLoadSize": {
|
||||
"description": "Payload size in bytes depending on elasticsearch config",
|
||||
"type": "integer",
|
||||
"default": 10485760
|
||||
},
|
||||
"clusterAlias": {
|
||||
"description": "Alias for search indexes to provide segregation of indexes.",
|
||||
"type": "string",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user