From b7f3157986b0bc36dace8837df04f1392c64bfdc Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 28 Oct 2024 23:10:43 -0700 Subject: [PATCH] Parallelize the search indexing process (#18445) * Parallelize the indexing process * Parallelize indexing (cherry picked from commit 0114007a228936748b695cdc156f84fc5aa4f309) --- .../bundles/searchIndex/SearchIndexApp.java | 190 ++++++++++++------ 1 file changed, 130 insertions(+), 60 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index a76a6672ef7..a6c1ccdf8a2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -12,6 +12,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; @@ -107,7 +112,9 @@ public class SearchIndexApp extends AbstractNativeApplication { private Sink searchIndexSink; @Getter EventPublisherJob jobData; + private final Object jobDataLock = new Object(); // Dedicated final lock object private volatile boolean stopped = false; + @Getter private volatile ExecutorService executorService; public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) { super(collectionDAO, searchRepository); @@ -131,19 +138,12 @@ public class SearchIndexApp extends AbstractNativeApplication { try { initializeJob(); LOG.info("Executing Reindexing Job with JobData : {}", jobData); - // Update Job Status jobData.setStatus(EventPublisherJob.Status.RUNNING); - - // Make recreate as false for onDemand String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - - // Schedule Run has re-create set to false if (!runType.equals(ON_DEMAND_JOB)) { jobData.setRecreateIndex(false); } - - // Run ReIndexing performReindex(jobExecutionContext); } catch (Exception ex) { IndexingError indexingError = @@ -157,7 +157,6 @@ public class SearchIndexApp extends AbstractNativeApplication { jobData.setStatus(EventPublisherJob.Status.RUNNING); jobData.setFailure(indexingError); } finally { - // Send update sendUpdates(jobExecutionContext); } } @@ -176,7 +175,6 @@ public class SearchIndexApp extends AbstractNativeApplication { // Remove any Stale Jobs cleanUpStaleJobsFromRuns(); - // Initialize New Job int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO); this.jobData.setStats( new Stats() @@ -223,16 +221,11 @@ public class SearchIndexApp extends AbstractNativeApplication { public void updateRecordToDb(JobExecutionContext jobExecutionContext) { AppRunRecord appRecord = getJobRecord(jobExecutionContext); - // Update Run Record with Status appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); - - // Update Error if (jobData.getFailure() != null) { appRecord.setFailureContext( new FailureContext().withAdditionalProperty("failure", jobData.getFailure())); } - - // Update Stats if (jobData.getStats() != null) { appRecord.setSuccessContext( new SuccessContext().withAdditionalProperty("stats", jobData.getStats())); @@ -241,45 +234,110 @@ public class SearchIndexApp extends AbstractNativeApplication { pushAppStatusUpdates(jobExecutionContext, appRecord, true); } - private void performReindex(JobExecutionContext jobExecutionContext) throws SearchIndexException { - Map contextData = new HashMap<>(); - for (Source paginatedSource : paginatedSources) { - List entityName = new ArrayList<>(); - reCreateIndexes(paginatedSource.getEntityType()); - contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType()); - Object resultList; - while (!stopped && !paginatedSource.isDone()) { - try { - resultList = paginatedSource.readNext(null); - if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) { - entityName = - getEntityNameFromEntity( - (ResultList) resultList, - paginatedSource.getEntityType()); - contextData.put(ENTITY_NAME_LIST_KEY, entityName); - processEntity( - (ResultList) resultList, contextData, paginatedSource); - } else { - entityName = - getEntityNameFromEntityTimeSeries( - (ResultList) resultList, - paginatedSource.getEntityType()); - contextData.put(ENTITY_NAME_LIST_KEY, entityName); - processEntityTimeSeries( - (ResultList) resultList, - contextData, - paginatedSource); - } + private void performReindex(JobExecutionContext jobExecutionContext) { + if (jobData.getStats() == null) { + jobData.setStats(new Stats()); + } - } catch (SearchIndexException rx) { - jobData.setStatus(EventPublisherJob.Status.RUNNING); - jobData.setFailure(rx.getIndexingError()); - paginatedSource.updateStats( - rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount()); - } finally { - updateStats(paginatedSource.getEntityType(), paginatedSource.getStats()); - sendUpdates(jobExecutionContext); - } + if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) { + int numThreads = + Math.min(paginatedSources.size(), Runtime.getRuntime().availableProcessors()); + this.executorService = Executors.newFixedThreadPool(numThreads); + LOG.debug("Initialized new ExecutorService with {} threads.", numThreads); + } + List> futures = new ArrayList<>(); + + for (Source paginatedSource : paginatedSources) { + Future future = + executorService.submit( + () -> { + String entityType = paginatedSource.getEntityType(); + Map contextData = new HashMap<>(); + contextData.put(ENTITY_TYPE_KEY, entityType); + try { + reCreateIndexes(entityType); + contextData.put(ENTITY_TYPE_KEY, entityType); + + while (!paginatedSource.isDone()) { + Object resultList = paginatedSource.readNext(null); + if (resultList == null) { + break; + } + + if (!TIME_SERIES_ENTITIES.contains(entityType)) { + List entityNames = + getEntityNameFromEntity( + (ResultList) resultList, entityType); + contextData.put(ENTITY_NAME_LIST_KEY, entityNames); + processEntity( + (ResultList) resultList, + contextData, + paginatedSource); + } else { + List entityNames = + getEntityNameFromEntityTimeSeries( + (ResultList) resultList, + entityType); + contextData.put(ENTITY_NAME_LIST_KEY, entityNames); + processEntityTimeSeries( + (ResultList) resultList, + contextData, + paginatedSource); + } + synchronized (jobDataLock) { + updateStats(entityType, paginatedSource.getStats()); + } + sendUpdates(jobExecutionContext); + } + + } catch (SearchIndexException e) { + synchronized (jobDataLock) { + jobData.setStatus(EventPublisherJob.Status.RUNNING); + jobData.setFailure(e.getIndexingError()); + paginatedSource.updateStats( + e.getIndexingError().getSuccessCount(), + e.getIndexingError().getFailedCount()); + updateStats(entityType, paginatedSource.getStats()); + } + sendUpdates(jobExecutionContext); + } catch (Exception e) { + synchronized (jobDataLock) { + jobData.setStatus(EventPublisherJob.Status.FAILED); + jobData.setFailure( + new IndexingError() + .withErrorSource(IndexingError.ErrorSource.JOB) + .withMessage(e.getMessage())); + } + sendUpdates(jobExecutionContext); + LOG.error("Unexpected error during reindexing for entity {}", entityType, e); + } + }); + + futures.add(future); + } + + executorService.shutdown(); + + try { + boolean allTasksCompleted = executorService.awaitTermination(1, TimeUnit.HOURS); + if (!allTasksCompleted) { + LOG.warn("Reindexing tasks did not complete within the expected time."); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Reindexing was interrupted", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + LOG.error("Task was interrupted", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + LOG.error("Exception in reindexing task", e.getCause()); } } } @@ -347,9 +405,7 @@ public class SearchIndexApp extends AbstractNativeApplication { private void sendUpdates(JobExecutionContext jobExecutionContext) { try { - // store job details in Database jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); - // Update Record to db updateRecordToDb(jobExecutionContext); if (WebSocketManager.getInstance() != null) { WebSocketManager.getInstance() @@ -362,10 +418,8 @@ public class SearchIndexApp extends AbstractNativeApplication { } public void updateStats(String entityType, StepStats currentEntityStats) { - // Job Level Stats Stats jobDataStats = jobData.getStats(); - // Update Entity Level Stats StepStats entityLevelStats = jobDataStats.getEntityStats(); if (entityLevelStats == null) { entityLevelStats = @@ -373,7 +427,6 @@ public class SearchIndexApp extends AbstractNativeApplication { } entityLevelStats.withAdditionalProperty(entityType, currentEntityStats); - // Total Stats StepStats stats = jobData.getStats().getJobStats(); if (stats == null) { stats = @@ -392,7 +445,6 @@ public class SearchIndexApp extends AbstractNativeApplication { .mapToInt(StepStats::getFailedRecords) .sum()); - // Update for the Job jobDataStats.setJobStats(stats); jobDataStats.setEntityStats(entityLevelStats); @@ -405,13 +457,31 @@ public class SearchIndexApp extends AbstractNativeApplication { } IndexMapping indexType = searchRepository.getIndexMapping(entityType); - // Delete index searchRepository.deleteIndex(indexType); - // Create index searchRepository.createIndex(indexType); } public void stopJob() { + LOG.info("Stopping reindexing job."); stopped = true; + if (executorService != null && !executorService.isShutdown()) { + List awaitingTasks = executorService.shutdownNow(); + LOG.info( + "ExecutorService has been shutdown. Awaiting termination. {} tasks were awaiting execution.", + awaitingTasks.size()); + + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("ExecutorService did not terminate within the specified timeout."); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for ExecutorService to terminate.", e); + List stillAwaitingTasks = executorService.shutdownNow(); // Force shutdown + LOG.info( + "Forced shutdown initiated due to interruption. {} tasks were awaiting execution.", + stillAwaitingTasks.size()); + Thread.currentThread().interrupt(); // Restore interrupt status + } + } } }