diff --git a/bootstrap/sql/migrations/native/1.5.11/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.5.11/mysql/postDataMigrationSQLScript.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.5.11/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.5.11/mysql/schemaChanges.sql new file mode 100644 index 00000000000..cccd63eed9d --- /dev/null +++ b/bootstrap/sql/migrations/native/1.5.11/mysql/schemaChanges.sql @@ -0,0 +1,5 @@ +-- Remove SearchIndexing for api Service, collection and endpoint +DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication'; +DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication'; +DELETE from installed_apps where name = 'SearchIndexingApplication'; +DELETE from apps_marketplace where name = 'SearchIndexingApplication'; \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.5.11/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.5.11/postgres/postDataMigrationSQLScript.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.5.11/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.5.11/postgres/schemaChanges.sql new file mode 100644 index 00000000000..f99fd9814be --- /dev/null +++ b/bootstrap/sql/migrations/native/1.5.11/postgres/schemaChanges.sql @@ -0,0 +1,5 @@ +-- Remove SearchIndexing for api Service, collection and endpoint +DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication'; +DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication'; +DELETE from installed_apps where name = 'SearchIndexingApplication'; +DELETE from apps_marketplace where name = 'SearchIndexingApplication'; \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 8af69e78fee..2b2b98d0995 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -1,5 +1,6 @@ package org.openmetadata.service.apps.bundles.searchIndex; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.API_COLLCECTION; import static org.openmetadata.service.Entity.API_ENDPOINT; import static org.openmetadata.service.Entity.API_SERVICE; @@ -46,18 +47,14 @@ import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -65,7 +62,6 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; -import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityTimeSeriesInterface; import org.openmetadata.schema.analytics.ReportData; @@ -154,22 +150,16 @@ public class SearchIndexApp extends AbstractNativeApplication { TEST_CASE_RESULT); // Constants to replace magic numbers - private static final int DEFAULT_PAYLOAD_SIZE = 100; - private static final int DEFAULT_BATCH_SIZE = 5; - private static final int DEFAULT_MAX_RETRIES = 1000; - private static final int DEFAULT_TIMEOUT = 10000; - private static final int MAX_CONSUMERS = 10; private BulkSink searchIndexSink; @Getter private EventPublisherJob jobData; private final Object jobDataLock = new Object(); private volatile boolean stopped = false; - private ExecutorService producerExecutor; private ExecutorService consumerExecutor; - private ExecutorService entityReaderExecutor; + private ExecutorService producerExecutor; private final BlockingQueue> taskQueue = new LinkedBlockingQueue<>(); private final AtomicReference searchIndexStats = new AtomicReference<>(); - private final AtomicReference batchSize = new AtomicReference<>(DEFAULT_BATCH_SIZE); + private final AtomicReference batchSize = new AtomicReference<>(5); public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); @@ -245,20 +235,20 @@ public class SearchIndexApp extends AbstractNativeApplication { new OpenSearchIndexSink( searchRepository.getSearchClient(), jobData.getPayLoadSize(), - DEFAULT_PAYLOAD_SIZE, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_RETRIES, - DEFAULT_TIMEOUT); + jobData.getMaxConcurrentRequests(), + jobData.getMaxRetries(), + jobData.getInitialBackoff(), + jobData.getMaxBackoff()); LOG.info("Initialized OpenSearchIndexSink."); } else { this.searchIndexSink = new ElasticSearchIndexSink( searchRepository.getSearchClient(), jobData.getPayLoadSize(), - DEFAULT_PAYLOAD_SIZE, - DEFAULT_BATCH_SIZE, - DEFAULT_MAX_RETRIES, - DEFAULT_TIMEOUT); + jobData.getMaxConcurrentRequests(), + jobData.getMaxRetries(), + jobData.getInitialBackoff(), + jobData.getMaxBackoff()); LOG.info("Initialized ElasticSearchIndexSink."); } } @@ -288,56 +278,68 @@ public class SearchIndexApp extends AbstractNativeApplication { } private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException { - int numProducers = jobData.getEntities().size(); - int numConsumers = calculateNumberOfConsumers(); + int numProducers = jobData.getProducerThreads(); + int numConsumers = jobData.getConsumerThreads(); LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers); - producerExecutor = Executors.newFixedThreadPool(numProducers); consumerExecutor = Executors.newFixedThreadPool(numConsumers); - entityReaderExecutor = Executors.newCachedThreadPool(); - CountDownLatch producerLatch = new CountDownLatch(numProducers); + producerExecutor = Executors.newFixedThreadPool(numProducers); try { - for (String entityType : jobData.getEntities()) { - producerExecutor.submit( - () -> { - try { - reCreateIndexes(entityType); - processEntityType(entityType); - } catch (Exception e) { - LOG.error("Error processing entity type {}", entityType, e); - } finally { - producerLatch.countDown(); - } - }); - } - - for (int i = 0; i < numConsumers; i++) { - consumerExecutor.submit( - () -> { - try { - consumeTasks(jobExecutionContext); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Consumer thread interrupted."); - } - }); - } - - producerLatch.await(); - sendPoisonPills(numConsumers); + processEntityReindex(jobExecutionContext); } catch (Exception e) { LOG.error("Error during reindexing process.", e); throw e; } finally { - shutdownExecutor(producerExecutor, "ProducerExecutor", 1, TimeUnit.HOURS); + shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES); shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS); - shutdownExecutor(entityReaderExecutor, "ReaderExecutor", 20, TimeUnit.SECONDS); } } - private int calculateNumberOfConsumers() { - return Math.min(Runtime.getRuntime().availableProcessors(), MAX_CONSUMERS); + private void processEntityReindex(JobExecutionContext jobExecutionContext) + throws InterruptedException { + int numConsumers = jobData.getConsumerThreads(); + CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities())); + for (String entityType : jobData.getEntities()) { + try { + reCreateIndexes(entityType); + int totalEntityRecords = getTotalEntityRecords(entityType); + Source source = createSource(entityType); + int noOfThreads = calculateNumberOfThreads(totalEntityRecords); + if (totalEntityRecords > 0) { + for (int i = 0; i < noOfThreads; i++) { + int currentOffset = i * batchSize.get(); + producerExecutor.submit( + () -> { + try { + processReadTask(entityType, source, currentOffset); + } catch (Exception e) { + LOG.error("Error processing entity type {}", entityType, e); + } finally { + producerLatch.countDown(); + } + }); + } + } + } catch (Exception e) { + LOG.error("Error processing entity type {}", entityType, e); + } + } + + for (int i = 0; i < numConsumers; i++) { + consumerExecutor.submit( + () -> { + try { + consumeTasks(jobExecutionContext); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Consumer thread interrupted."); + } + }); + } + + producerLatch.await(); + sendPoisonPills(numConsumers); } private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException { @@ -548,29 +550,42 @@ public class SearchIndexApp extends AbstractNativeApplication { private void processTask(IndexingTask task, JobExecutionContext jobExecutionContext) { String entityType = task.entityType(); - List entities = task.entities(); + ResultList entities = task.entities(); Map contextData = new HashMap<>(); contextData.put(ENTITY_TYPE_KEY, entityType); try { if (!TIME_SERIES_ENTITIES.contains(entityType)) { @SuppressWarnings("unchecked") - List entityList = (List) entities; + List entityList = (List) entities.getData(); searchIndexSink.write(entityList, contextData); } else { @SuppressWarnings("unchecked") - List entityList = (List) entities; + List entityList = + (List) entities.getData(); searchIndexSink.write(entityList, contextData); } // After successful write, create a new StepStats for the current batch StepStats currentEntityStats = new StepStats(); - currentEntityStats.setSuccessRecords(entities.size()); - currentEntityStats.setFailedRecords(0); + currentEntityStats.setSuccessRecords(entities.getData().size()); + currentEntityStats.setFailedRecords(entities.getErrors().size()); // Do NOT set Total Records here // Update statistics in a thread-safe manner synchronized (jobDataLock) { + if (!entities.getErrors().isEmpty()) { + jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR); + jobData.setFailure( + new IndexingError() + .withErrorSource(IndexingError.ErrorSource.READER) + .withSubmittedCount(batchSize.get()) + .withSuccessCount(entities.getData().size()) + .withFailedCount(entities.getErrors().size()) + .withMessage( + "Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.") + .withFailedEntities(entities.getErrors())); + } updateStats(entityType, currentEntityStats); } @@ -584,7 +599,7 @@ public class SearchIndexApp extends AbstractNativeApplication { StepStats failedEntityStats = new StepStats(); failedEntityStats.setSuccessRecords(0); - failedEntityStats.setFailedRecords(entities.size()); + failedEntityStats.setFailedRecords(entities.getData().size()); updateStats(entityType, failedEntityStats); } LOG.error("Unexpected error during processing task for entity {}", entityType, e); @@ -601,14 +616,14 @@ public class SearchIndexApp extends AbstractNativeApplication { if (!TIME_SERIES_ENTITIES.contains(entityType)) { PaginatedEntitiesSource paginatedSource = new PaginatedEntitiesSource(entityType, batchSize.get(), fields); - if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { + if (!nullOrEmpty(jobData.getAfterCursor())) { paginatedSource.getCursor().set(jobData.getAfterCursor()); } source = paginatedSource; } else { PaginatedEntityTimeSeriesSource paginatedSource = new PaginatedEntityTimeSeriesSource(entityType, batchSize.get(), fields); - if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { + if (!nullOrEmpty(jobData.getAfterCursor())) { paginatedSource.getCursor().set(jobData.getAfterCursor()); } source = paginatedSource; @@ -617,17 +632,14 @@ public class SearchIndexApp extends AbstractNativeApplication { return source; } - private void processEntityType(String entityType) - throws InterruptedException, ExecutionException { - int totalEntityRecords = getTotalEntityRecords(entityType); - if (totalEntityRecords > 0) { - Source source = createSource(entityType); - int loadPerThread = calculateLoadPerThread(totalEntityRecords); - List> futures = submitReaderTasks(entityType, source, loadPerThread); - for (Future future : futures) { - future.get(); - } + private int getTotalLatchCount(Set entities) { + int totalCount = 0; + for (String entityType : entities) { + int totalEntityRecords = getTotalEntityRecords(entityType); + int noOfThreads = calculateNumberOfThreads(totalEntityRecords); + totalCount += noOfThreads; } + return totalCount; } private int getTotalEntityRecords(String entityType) { @@ -636,24 +648,12 @@ public class SearchIndexApp extends AbstractNativeApplication { .getTotalRecords(); } - private List> submitReaderTasks( - String entityType, Source source, int loadPerThread) { - List> futures = new ArrayList<>(); - for (int i = 0; i < loadPerThread; i++) { - int currentOffset = i * batchSize.get(); - Future future = - entityReaderExecutor.submit(() -> processReadTask(entityType, source, currentOffset)); - futures.add(future); - } - return futures; - } - private void processReadTask(String entityType, Source source, int offset) { try { Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset))); if (resultList != null) { - List entities = extractEntities(entityType, resultList); - if (entities != null && !entities.isEmpty()) { + ResultList entities = extractEntities(entityType, resultList); + if (!nullOrEmpty(entities.getData())) { LOG.info( "Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset); createIndexingTask(entityType, entities, offset); @@ -664,16 +664,29 @@ public class SearchIndexApp extends AbstractNativeApplication { LOG.warn("Reader thread interrupted for entityType: {}", entityType); } catch (SearchIndexException e) { LOG.error("Error while reading source for entityType: {}", entityType, e); + synchronized (jobDataLock) { + jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR); + jobData.setFailure(e.getIndexingError()); + int remainingRecords = getRemainingRecordsToProcess(entityType); + if (remainingRecords - batchSize.get() <= 0) { + updateStats( + entityType, + new StepStats().withSuccessRecords(0).withFailedRecords(remainingRecords)); + } else { + updateStats( + entityType, new StepStats().withSuccessRecords(0).withFailedRecords(batchSize.get())); + } + } } } - private void createIndexingTask(String entityType, List entities, int offset) + private void createIndexingTask(String entityType, ResultList entities, int offset) throws InterruptedException { IndexingTask task = new IndexingTask<>(entityType, entities, offset); taskQueue.put(task); } - private synchronized int calculateLoadPerThread(int totalEntityRecords) { + private synchronized int calculateNumberOfThreads(int totalEntityRecords) { int mod = totalEntityRecords % batchSize.get(); if (mod == 0) { return totalEntityRecords / batchSize.get(); @@ -683,16 +696,26 @@ public class SearchIndexApp extends AbstractNativeApplication { } @SuppressWarnings("unchecked") - private List extractEntities(String entityType, Object resultList) { + private ResultList extractEntities(String entityType, Object resultList) { if (!TIME_SERIES_ENTITIES.contains(entityType)) { - return ((ResultList) resultList).getData(); + return ((ResultList) resultList); } else { - return ((ResultList) resultList).getData(); + return ((ResultList) resultList); } } - private record IndexingTask(String entityType, List entities, int currentEntityOffset) { + private synchronized int getRemainingRecordsToProcess(String entityType) { + StepStats entityStats = + ((StepStats) + searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType)); + return entityStats.getTotalRecords() + - entityStats.getFailedRecords() + - entityStats.getSuccessRecords(); + } + + private record IndexingTask( + String entityType, ResultList entities, int currentEntityOffset) { public static final IndexingTask POISON_PILL = - new IndexingTask<>(null, Collections.emptyList(), -1); + new IndexingTask<>(null, new ResultList<>(), -1); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index c631699a715..9fe3eaaf4db 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -278,10 +278,15 @@ public class OpenMetadataOperations implements Callable { description = "Flag to determine if indexes should be recreated.") boolean recreateIndexes, @Option( - names = {"--num-threads"}, + names = {"--producer-threads"}, defaultValue = "10", description = "Number of threads to use for processing.") - int numThreads, + int producerThreads, + @Option( + names = {"--consumer-threads"}, + defaultValue = "10", + description = "Number of threads to use for processing.") + int consumerThreads, @Option( names = {"--back-off"}, defaultValue = "1000", @@ -320,7 +325,8 @@ public class OpenMetadataOperations implements Callable { batchSize, payloadSize, recreateIndexes, - numThreads, + producerThreads, + consumerThreads, backOff, maxBackOff, maxRequests, @@ -336,7 +342,8 @@ public class OpenMetadataOperations implements Callable { int batchSize, long payloadSize, boolean recreateIndexes, - int numThreads, + int producerThreads, + int consumerThreads, int backOff, int maxBackOff, int maxRequests, @@ -354,7 +361,8 @@ public class OpenMetadataOperations implements Callable { .withBatchSize(batchSize) .withPayLoadSize(payloadSize) .withRecreateIndex(recreateIndexes) - .withNumberOfThreads(numThreads) + .withProducerThreads(producerThreads) + .withConsumerThreads(consumerThreads) .withInitialBackoff(backOff) .withMaxBackoff(maxBackOff) .withMaxConcurrentRequests(maxRequests) diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index 0909fbac2e9..261b6563fd3 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -46,7 +46,8 @@ "recreateIndex": false, "batchSize": "100", "payLoadSize": 104857600, - "numberOfThreads": 5, + "producerThreads": 10, + "consumerThreads": 10, "maxConcurrentRequests": 100, "maxRetries": 3, "initialBackoff": 1000, diff --git a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json index 6ed0550f1c4..d8b2ee10830 100644 --- a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json @@ -60,6 +60,12 @@ "recreateIndex": false, "batchSize": "100", "payLoadSize": 104857600, + "producerThreads": 10, + "consumerThreads": 10, + "maxConcurrentRequests": 100, + "maxRetries": 3, + "initialBackoff": 1000, + "maxBackoff": 10000, "searchIndexMappingLanguage": "EN" } } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json index dfc5423e68a..d05c68af785 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json @@ -44,11 +44,17 @@ "existingJavaType": "java.lang.Long", "default": 104857600 }, - "numberOfThreads": { - "title": "Number of Threads", + "producerThreads": { + "title": "Number of Producer Threads", "description": "Number of threads to use for reindexing", "type": "integer", - "default": 5 + "default": 10 + }, + "consumerThreads": { + "title": "Number of Consumer Threads", + "description": "Number of threads to use for reindexing", + "type": "integer", + "default": 10 }, "maxConcurrentRequests": { "title": "Max Concurrent Requests", @@ -80,4 +86,4 @@ } }, "additionalProperties": false -} \ No newline at end of file +} diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index 6767c847238..e002fd75ff7 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -108,11 +108,17 @@ "existingJavaType": "java.lang.Long", "default": 104857600 }, - "numberOfThreads": { - "title": "Number of Threads", - "description": "Number of threads to use for reindexing", + "producerThreads": { + "title": "Number of Producer Threads to use", + "description": "Number of producer threads to use for reindexing", "type": "integer", - "default": 5 + "default": 10 + }, + "consumerThreads": { + "title": "Number of Consumer Threads to use", + "description": "Number of consumer threads to use for reindexing", + "type": "integer", + "default": 10 }, "maxConcurrentRequests": { "title": "Max Concurrent Requests", diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json index 039bf0539b6..96bd53944b8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json @@ -16,8 +16,14 @@ "type": "integer", "default": 104857600 }, - "numberOfThreads": { - "title": "Number of Threads", + "producerThreads": { + "title": "Number of Producer Threads", + "description": "Number of threads to use for reindexing", + "type": "integer", + "default": 5 + }, + "consumerThreads": { + "title": "Number of Consumer Threads", "description": "Number of threads to use for reindexing", "type": "integer", "default": 5