From c010939072958700ec98e3fdef587013bf5dfca0 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:25:06 +0530 Subject: [PATCH] Added payload size check logic for Elasticsearch (#15540) * Added payload size check logic for Elasticsearch * Added payload size check logic for Elasticsearch - made review changes * Added payload size check logic for Elasticsearch - added env in docker files * Optimize payload logic, fix search application error stats issue --- conf/openmetadata.yaml | 1 + .../development/docker-compose-postgres.yml | 2 + docker/development/docker-compose.yml | 2 + .../docker-compose-openmetadata.yml | 2 + docker/docker-compose-openmetadata/env-mysql | 1 + .../docker-compose-openmetadata/env-postgres | 1 + .../docker-compose-postgres.yml | 2 + .../docker-compose.yml | 2 + .../bundles/searchIndex/SearchIndexApp.java | 28 +++++++ .../elasticsearch/ElasticSearchIndexSink.java | 76 ++++++++++++++---- .../opensearch/OpenSearchIndexSink.java | 80 ++++++++++++++----- .../PaginatedDataInsightSource.java | 2 +- .../searchIndex/PaginatedEntitiesSource.java | 4 +- .../workflows/searchIndex/ReindexingUtil.java | 1 + .../elasticSearchConfiguration.json | 5 ++ 15 files changed, 171 insertions(+), 38 deletions(-) diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 0916de60fb6..084acb8163b 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -273,6 +273,7 @@ elasticsearch: socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} keepAliveTimeoutSecs: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10} + payLoadSize: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes searchIndexMappingLanguage : ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} searchIndexFactoryClassName : org.openmetadata.service.search.SearchIndexFactory diff --git a/docker/development/docker-compose-postgres.yml b/docker/development/docker-compose-postgres.yml index a4d7b514841..b7dbea5d56e 100644 --- a/docker/development/docker-compose-postgres.yml +++ b/docker/development/docker-compose-postgres.yml @@ -191,6 +191,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration @@ -400,6 +401,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration diff --git a/docker/development/docker-compose.yml b/docker/development/docker-compose.yml index 3c653a92666..d6628e11083 100644 --- a/docker/development/docker-compose.yml +++ b/docker/development/docker-compose.yml @@ -190,6 +190,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration @@ -396,6 +397,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration diff --git a/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml b/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml index da5b6e30886..7083ed1017e 100644 --- a/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml +++ b/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml @@ -134,6 +134,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration @@ -339,6 +340,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration diff --git a/docker/docker-compose-openmetadata/env-mysql b/docker/docker-compose-openmetadata/env-mysql index c0f77dd54b4..b20bd87cf3a 100644 --- a/docker/docker-compose-openmetadata/env-mysql +++ b/docker/docker-compose-openmetadata/env-mysql @@ -97,6 +97,7 @@ ELASTICSEARCH_CONNECTION_TIMEOUT_SECS="5" ELASTICSEARCH_SOCKET_TIMEOUT_SECS="60" ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS="600" ELASTICSEARCH_BATCH_SIZE="10" +ELASTICSEARCH_PAYLOAD_BYTES_SIZE="10485760" ELASTICSEARCH_INDEX_MAPPING_LANG="EN" # Event Monitoring configurations EVENT_MONITOR="prometheus" diff --git a/docker/docker-compose-openmetadata/env-postgres b/docker/docker-compose-openmetadata/env-postgres index 5afb6469743..7e53c3a09a0 100644 --- a/docker/docker-compose-openmetadata/env-postgres +++ b/docker/docker-compose-openmetadata/env-postgres @@ -97,6 +97,7 @@ ELASTICSEARCH_CONNECTION_TIMEOUT_SECS="5" ELASTICSEARCH_SOCKET_TIMEOUT_SECS="60" ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS="600" ELASTICSEARCH_BATCH_SIZE="10" +ELASTICSEARCH_PAYLOAD_BYTES_SIZE="10485760" ELASTICSEARCH_INDEX_MAPPING_LANG="EN" # Event Monitoring configurations EVENT_MONITOR="prometheus" diff --git a/docker/docker-compose-quickstart/docker-compose-postgres.yml b/docker/docker-compose-quickstart/docker-compose-postgres.yml index 6365ad292a8..99510459064 100644 --- a/docker/docker-compose-quickstart/docker-compose-postgres.yml +++ b/docker/docker-compose-quickstart/docker-compose-postgres.yml @@ -181,6 +181,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration @@ -386,6 +387,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration diff --git a/docker/docker-compose-quickstart/docker-compose.yml b/docker/docker-compose-quickstart/docker-compose.yml index 12e3ce1683f..38aa0c9431b 100644 --- a/docker/docker-compose-quickstart/docker-compose.yml +++ b/docker/docker-compose-quickstart/docker-compose.yml @@ -179,6 +179,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration @@ -384,6 +385,7 @@ services: ELASTICSEARCH_SOCKET_TIMEOUT_SECS: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60} ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS: ${ELASTICSEARCH_KEEP_ALIVE_TIMEOUT_SECS:-600} ELASTICSEARCH_BATCH_SIZE: ${ELASTICSEARCH_BATCH_SIZE:-10} + ELASTICSEARCH_PAYLOAD_BYTES_SIZE: ${ELASTICSEARCH_PAYLOAD_BYTES_SIZE:-10485760} #max payLoadSize in Bytes ELASTICSEARCH_INDEX_MAPPING_LANG: ${ELASTICSEARCH_INDEX_MAPPING_LANG:-EN} #eventMonitoringConfiguration diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 1bbe997598f..20f1ec24336 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -3,6 +3,7 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; @@ -12,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; @@ -220,6 +222,17 @@ public class SearchIndexApp extends AbstractNativeApplication { while (!stopped && !paginatedEntitiesSource.isDone()) { try { resultList = paginatedEntitiesSource.readNext(null); + List entityName = + resultList.getData().stream() + .map( + entity -> + String.format( + "%s %s", + paginatedEntitiesSource.getEntityType(), + entity.getFullyQualifiedName())) + .collect(Collectors.toList()); + + contextData.put(ENTITY_NAME_LIST_KEY, entityName); if (!resultList.getData().isEmpty()) { searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData); if (!resultList.getErrors().isEmpty()) { @@ -234,10 +247,13 @@ public class SearchIndexApp extends AbstractNativeApplication { "Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.") .withFailedEntities(resultList.getErrors())); } + paginatedEntitiesSource.updateStats(resultList.getData().size(), 0); } } catch (SearchIndexException rx) { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(rx.getIndexingError()); + paginatedEntitiesSource.updateStats( + rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount()); } finally { updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats()); sendUpdates(jobExecutionContext); @@ -255,13 +271,25 @@ public class SearchIndexApp extends AbstractNativeApplication { while (!stopped && !paginatedDataInsightSource.isDone()) { try { resultList = paginatedDataInsightSource.readNext(null); + List entityName = + resultList.getData().stream() + .map( + entity -> + String.format( + "%s %s", paginatedDataInsightSource.getEntityType(), entity.getId())) + .collect(Collectors.toList()); + + contextData.put(ENTITY_NAME_LIST_KEY, entityName); if (!resultList.getData().isEmpty()) { searchIndexSink.write( dataInsightProcessor.process(resultList, contextData), contextData); } + paginatedDataInsightSource.updateStats(resultList.getData().size(), 0); } catch (SearchIndexException ex) { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(ex.getIndexingError()); + paginatedDataInsightSource.updateStats( + ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount()); } finally { updateStats( paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java index 3a4232f9c29..be4ff7c19ea 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java @@ -1,16 +1,22 @@ package org.openmetadata.service.search.elasticsearch; import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponseEs; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import es.org.elasticsearch.action.DocWriteRequest; import es.org.elasticsearch.action.bulk.BulkRequest; import es.org.elasticsearch.action.bulk.BulkResponse; import es.org.elasticsearch.client.RequestOptions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; +import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.exception.SearchIndexException; @@ -22,9 +28,11 @@ import org.openmetadata.service.workflows.interfaces.Sink; public class ElasticSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; + private final int maxPayLoadSizeInBytes; public ElasticSearchIndexSink(SearchRepository searchRepository, int total) { this.searchRepository = searchRepository; + this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize(); this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); } @@ -33,30 +41,64 @@ public class ElasticSearchIndexSink implements Sink { throws SearchIndexException { LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); try { - BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT); - int currentSuccess = getSuccessFromBulkResponseEs(response); - int currentFailed = response.getItems().length - currentSuccess; + List entityNames = + (List) + Optional.ofNullable(contextData.get(ENTITY_NAME_LIST_KEY)) + .orElse(Collections.emptyList()); + List entityErrorList = new ArrayList<>(); + BulkResponse response = null; - if (currentFailed != 0) { + BulkRequest bufferData = new BulkRequest(); + long requestIndex = 0; // Index to track the corresponding entity name + + for (DocWriteRequest request : data.requests()) { + long requestSize = new BulkRequest().add(request).estimatedSizeInBytes(); + + if (requestSize > maxPayLoadSizeInBytes) { + entityErrorList.add( + new EntityError() + .withMessage("Entity size exceeds elastic search maximum payload size") + .withEntity(entityNames.get(Math.toIntExact(requestIndex)))); + requestIndex++; + continue; + } + + if (bufferData.estimatedSizeInBytes() + requestSize > maxPayLoadSizeInBytes) { + response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + bufferData = new BulkRequest(); + } + + bufferData.add(request); + requestIndex++; + } + + // Send the last buffer if it has any requests + if (!bufferData.requests().isEmpty()) { + response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + } + + LOG.debug( + "[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", + data.numberOfActions(), + data.numberOfActions() - entityErrorList.size(), + entityErrorList.size()); + updateStats(data.numberOfActions() - entityErrorList.size(), entityErrorList.size()); + + // Handle errors + if (!entityErrorList.isEmpty()) { throw new SearchIndexException( new IndexingError() .withErrorSource(SINK) .withSubmittedCount(data.numberOfActions()) - .withSuccessCount(currentSuccess) - .withFailedCount(currentFailed) + .withSuccessCount(data.numberOfActions() - entityErrorList.size()) + .withFailedCount(entityErrorList.size()) .withMessage("Issues in Sink To Elastic Search.") - .withFailedEntities(getErrorsFromBulkResponse(response))); + .withFailedEntities(entityErrorList)); } - // Update Stats - LOG.debug( - "[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", - data.numberOfActions(), - currentSuccess, - currentFailed); - updateStats(currentSuccess, currentFailed); - - return response; + return response; // Return the last response } catch (SearchIndexException ex) { updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount()); throw ex; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java index 9a4ed71f8cc..09718b422a6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java @@ -1,19 +1,25 @@ package org.openmetadata.service.search.opensearch; import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSuccessFromBulkResponse; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; +import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.workflows.interfaces.Sink; +import os.org.opensearch.action.DocWriteRequest; import os.org.opensearch.action.bulk.BulkRequest; import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.client.RequestOptions; @@ -23,39 +29,77 @@ public class OpenSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; + private final int maxPayLoadSizeInBytes; + public OpenSearchIndexSink(SearchRepository repository, int total) { this.searchRepository = repository; + this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize(); this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); } @Override public BulkResponse write(BulkRequest data, Map contextData) throws SearchIndexException { - LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); + LOG.debug("[OsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); try { - BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT); - int currentSuccess = getSuccessFromBulkResponse(response); - int currentFailed = response.getItems().length - currentSuccess; + List entityNames = + (List) + Optional.ofNullable(contextData.get(ENTITY_NAME_LIST_KEY)) + .orElse(Collections.emptyList()); + List entityErrorList = new ArrayList<>(); + BulkResponse response = null; - if (currentFailed != 0) { + BulkRequest bufferData = new BulkRequest(); + long requestIndex = 0; // Index to track the corresponding entity name + + for (DocWriteRequest request : data.requests()) { + long requestSize = new BulkRequest().add(request).estimatedSizeInBytes(); + + if (requestSize > maxPayLoadSizeInBytes) { + entityErrorList.add( + new EntityError() + .withMessage("Entity size exceeds elastic search maximum payload size") + .withEntity(entityNames.get(Math.toIntExact(requestIndex)))); + requestIndex++; + continue; + } + + if (bufferData.estimatedSizeInBytes() + requestSize > maxPayLoadSizeInBytes) { + response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + bufferData = new BulkRequest(); + } + + bufferData.add(request); + requestIndex++; + } + + // Send the last buffer if it has any requests + if (!bufferData.requests().isEmpty()) { + response = searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT); + entityErrorList.addAll(getErrorsFromBulkResponse(response)); + } + + LOG.debug( + "[OSSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", + data.numberOfActions(), + data.numberOfActions() - entityErrorList.size(), + entityErrorList.size()); + updateStats(data.numberOfActions() - entityErrorList.size(), entityErrorList.size()); + + // Handle errors + if (!entityErrorList.isEmpty()) { throw new SearchIndexException( new IndexingError() .withErrorSource(SINK) .withSubmittedCount(data.numberOfActions()) - .withSuccessCount(currentSuccess) - .withFailedCount(currentFailed) + .withSuccessCount(data.numberOfActions() - entityErrorList.size()) + .withFailedCount(entityErrorList.size()) .withMessage("Issues in Sink To Elastic Search.") - .withFailedEntities(getErrorsFromBulkResponse(response))); + .withFailedEntities(entityErrorList)); } - // Update Stats - LOG.debug( - "[EsSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", - data.numberOfActions(), - currentSuccess, - currentFailed); - updateStats(currentSuccess, currentFailed); - return response; + return response; // Return the last response } catch (SearchIndexException ex) { updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount()); throw ex; @@ -68,7 +112,7 @@ public class OpenSearchIndexSink implements Sink { .withFailedCount(data.numberOfActions()) .withMessage("Issue in Sink to Elastic Search.") .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); - LOG.debug("[OsSearchIndexSink] Failed, Details : {}", JsonUtils.pojoToJson(indexingError)); + LOG.debug("[OSSearchIndexSink] Failed, Details : {}", JsonUtils.pojoToJson(indexingError)); updateStats(0, data.numberOfActions()); throw new SearchIndexException(indexingError); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java index 1a7d91b1bdf..b4e7c9f46c4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java @@ -84,7 +84,7 @@ public class PaginatedDataInsightSource implements Source batchSize, result.getData().size(), 0); - updateStats(result.getData().size(), result.getErrors().size()); + // updateStats(result.getData().size(), result.getErrors().size()); } catch (Exception ex) { IndexingError indexingError = new IndexingError() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 60e5ce027b1..c3c17a153d4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -88,14 +88,14 @@ public class PaginatedEntitiesSource implements Source