From df9753c7bb8c773ee6fb8032e7c31bee42aa9198 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Mon, 12 Aug 2024 11:34:52 +0530 Subject: [PATCH] Add Opensearch max payload Size (#17392) --- .../workflows/dataAssets/DataAssetsWorkflow.java | 12 ++++++++++-- .../apps/bundles/searchIndex/SearchIndexApp.java | 3 ++- .../search/elasticsearch/ElasticSearchIndexSink.java | 6 ------ .../search/opensearch/OpenSearchIndexSink.java | 4 ++-- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index 7c510e954e3..ab2ea713bf3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -122,10 +122,18 @@ public class DataAssetsWorkflow { this.entityEnricher = new DataInsightsEntityEnricherProcessor(totalRecords); if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { this.entityProcessor = new DataInsightsOpenSearchProcessor(totalRecords); - this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords); + this.searchIndexSink = + new OpenSearchIndexSink( + searchRepository, + totalRecords, + searchRepository.getElasticSearchConfiguration().getPayLoadSize()); } else { this.entityProcessor = new DataInsightsElasticSearchProcessor(totalRecords); - this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords); + this.searchIndexSink = + new ElasticSearchIndexSink( + searchRepository, + totalRecords, + searchRepository.getElasticSearchConfiguration().getPayLoadSize()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 7cf35bb4a15..ed2c4f5ad0e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -195,7 +195,8 @@ public class SearchIndexApp extends AbstractNativeApplication { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords); this.entityTimeSeriesProcessor = new OpenSearchEntityTimeSeriesProcessor(totalRecords); - this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords); + this.searchIndexSink = + new OpenSearchIndexSink(searchRepository, totalRecords, jobData.getPayLoadSize()); } else { this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords); this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java index 115b4d6d9cc..553788aa158 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java @@ -30,12 +30,6 @@ public class ElasticSearchIndexSink implements Sink { private final SearchRepository searchRepository; private final int maxPayLoadSizeInBytes; - public ElasticSearchIndexSink(SearchRepository searchRepository, int total) { - this.searchRepository = searchRepository; - this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize(); - this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); - } - public ElasticSearchIndexSink( SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) { this.searchRepository = searchRepository; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java index 70c2ffaf867..7dc0766b598 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java @@ -31,9 +31,9 @@ public class OpenSearchIndexSink implements Sink { private final int maxPayLoadSizeInBytes; - public OpenSearchIndexSink(SearchRepository repository, int total) { + public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) { this.searchRepository = repository; - this.maxPayLoadSizeInBytes = searchRepository.getElasticSearchConfiguration().getPayLoadSize(); + this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes; this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); }