Parallelize the search indexing process (#18445)

* Parallelize the indexing process

* Parallelize indexing

(cherry picked from commit 0114007a228936748b695cdc156f84fc5aa4f309)
This commit is contained in:
Sriharsha Chintalapani 2024-10-28 23:10:43 -07:00 committed by Teddy Crepineau
parent 669b09e6c5
commit b7f3157986

View File

@ -12,6 +12,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
@ -107,7 +112,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
private Sink searchIndexSink;
@Getter EventPublisherJob jobData;
private final Object jobDataLock = new Object(); // Dedicated final lock object
private volatile boolean stopped = false;
@Getter private volatile ExecutorService executorService;
public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
@ -131,19 +138,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
try {
initializeJob();
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);
// Make recreate as false for onDemand
String runType =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
// Schedule Run has re-create set to false
if (!runType.equals(ON_DEMAND_JOB)) {
jobData.setRecreateIndex(false);
}
// Run ReIndexing
performReindex(jobExecutionContext);
} catch (Exception ex) {
IndexingError indexingError =
@ -157,7 +157,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(indexingError);
} finally {
// Send update
sendUpdates(jobExecutionContext);
}
}
@ -176,7 +175,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
// Remove any Stale Jobs
cleanUpStaleJobsFromRuns();
// Initialize New Job
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()
@ -223,16 +221,11 @@ public class SearchIndexApp extends AbstractNativeApplication {
public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
AppRunRecord appRecord = getJobRecord(jobExecutionContext);
// Update Run Record with Status
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
// Update Error
if (jobData.getFailure() != null) {
appRecord.setFailureContext(
new FailureContext().withAdditionalProperty("failure", jobData.getFailure()));
}
// Update Stats
if (jobData.getStats() != null) {
appRecord.setSuccessContext(
new SuccessContext().withAdditionalProperty("stats", jobData.getStats()));
@ -241,45 +234,110 @@ public class SearchIndexApp extends AbstractNativeApplication {
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
}
private void performReindex(JobExecutionContext jobExecutionContext) throws SearchIndexException {
Map<String, Object> contextData = new HashMap<>();
for (Source paginatedSource : paginatedSources) {
List<String> entityName = new ArrayList<>();
reCreateIndexes(paginatedSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType());
Object resultList;
while (!stopped && !paginatedSource.isDone()) {
try {
resultList = paginatedSource.readNext(null);
if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
entityName =
getEntityNameFromEntity(
(ResultList<? extends EntityInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntity(
(ResultList<? extends EntityInterface>) resultList, contextData, paginatedSource);
} else {
entityName =
getEntityNameFromEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
contextData,
paginatedSource);
}
private void performReindex(JobExecutionContext jobExecutionContext) {
if (jobData.getStats() == null) {
jobData.setStats(new Stats());
}
} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(rx.getIndexingError());
paginatedSource.updateStats(
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
} finally {
updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
sendUpdates(jobExecutionContext);
}
if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) {
int numThreads =
Math.min(paginatedSources.size(), Runtime.getRuntime().availableProcessors());
this.executorService = Executors.newFixedThreadPool(numThreads);
LOG.debug("Initialized new ExecutorService with {} threads.", numThreads);
}
List<Future<?>> futures = new ArrayList<>();
for (Source paginatedSource : paginatedSources) {
Future<?> future =
executorService.submit(
() -> {
String entityType = paginatedSource.getEntityType();
Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType);
try {
reCreateIndexes(entityType);
contextData.put(ENTITY_TYPE_KEY, entityType);
while (!paginatedSource.isDone()) {
Object resultList = paginatedSource.readNext(null);
if (resultList == null) {
break;
}
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
List<String> entityNames =
getEntityNameFromEntity(
(ResultList<? extends EntityInterface>) resultList, entityType);
contextData.put(ENTITY_NAME_LIST_KEY, entityNames);
processEntity(
(ResultList<? extends EntityInterface>) resultList,
contextData,
paginatedSource);
} else {
List<String> entityNames =
getEntityNameFromEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
entityType);
contextData.put(ENTITY_NAME_LIST_KEY, entityNames);
processEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
contextData,
paginatedSource);
}
synchronized (jobDataLock) {
updateStats(entityType, paginatedSource.getStats());
}
sendUpdates(jobExecutionContext);
}
} catch (SearchIndexException e) {
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(e.getIndexingError());
paginatedSource.updateStats(
e.getIndexingError().getSuccessCount(),
e.getIndexingError().getFailedCount());
updateStats(entityType, paginatedSource.getStats());
}
sendUpdates(jobExecutionContext);
} catch (Exception e) {
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(e.getMessage()));
}
sendUpdates(jobExecutionContext);
LOG.error("Unexpected error during reindexing for entity {}", entityType, e);
}
});
futures.add(future);
}
executorService.shutdown();
try {
boolean allTasksCompleted = executorService.awaitTermination(1, TimeUnit.HOURS);
if (!allTasksCompleted) {
LOG.warn("Reindexing tasks did not complete within the expected time.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Reindexing was interrupted", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
LOG.error("Task was interrupted", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Exception in reindexing task", e.getCause());
}
}
}
@ -347,9 +405,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void sendUpdates(JobExecutionContext jobExecutionContext) {
try {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
@ -362,10 +418,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
public void updateStats(String entityType, StepStats currentEntityStats) {
// Job Level Stats
Stats jobDataStats = jobData.getStats();
// Update Entity Level Stats
StepStats entityLevelStats = jobDataStats.getEntityStats();
if (entityLevelStats == null) {
entityLevelStats =
@ -373,7 +427,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
entityLevelStats.withAdditionalProperty(entityType, currentEntityStats);
// Total Stats
StepStats stats = jobData.getStats().getJobStats();
if (stats == null) {
stats =
@ -392,7 +445,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
.mapToInt(StepStats::getFailedRecords)
.sum());
// Update for the Job
jobDataStats.setJobStats(stats);
jobDataStats.setEntityStats(entityLevelStats);
@ -405,13 +457,31 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
// Delete index
searchRepository.deleteIndex(indexType);
// Create index
searchRepository.createIndex(indexType);
}
public void stopJob() {
LOG.info("Stopping reindexing job.");
stopped = true;
if (executorService != null && !executorService.isShutdown()) {
List<Runnable> awaitingTasks = executorService.shutdownNow();
LOG.info(
"ExecutorService has been shutdown. Awaiting termination. {} tasks were awaiting execution.",
awaitingTasks.size());
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("ExecutorService did not terminate within the specified timeout.");
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for ExecutorService to terminate.", e);
List<Runnable> stillAwaitingTasks = executorService.shutdownNow(); // Force shutdown
LOG.info(
"Forced shutdown initiated due to interruption. {} tasks were awaiting execution.",
stillAwaitingTasks.size());
Thread.currentThread().interrupt(); // Restore interrupt status
}
}
}
}