fix(elastic): allow more time for re-indexing tasks (#3794)

This commit is contained in:
Gabe Lyons 2021-12-27 10:02:28 -08:00 committed by GitHub
parent 5df5150e51
commit e7b93796b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,6 +13,8 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
@ -25,11 +27,11 @@ import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
@Slf4j
@RequiredArgsConstructor
public class ESIndexBuilder {
@ -89,9 +91,33 @@ public class ESIndexBuilder {
String tempIndexName = indexName + "_" + System.currentTimeMillis();
createIndex(tempIndexName, mappings, finalSettings);
try {
searchClient.reindex(
new ReindexRequest().setSourceIndices(indexName).setDestIndex(tempIndexName),
RequestOptions.DEFAULT);
TaskSubmissionResponse reindexTask;
reindexTask = searchClient.submitReindexTask(new ReindexRequest().setSourceIndices(indexName).setDestIndex(tempIndexName),
RequestOptions.DEFAULT);
// wait up to 5 minutes for the task to complete
long startTime = System.currentTimeMillis();
long millisToWait60Minutes = 1000 * 60 * 60;
Boolean reindexTaskCompleted = false;
while ((System.currentTimeMillis() - startTime) < millisToWait60Minutes) {
log.info("Reindexing from {} to {} in progress...", indexName, tempIndexName);
ListTasksRequest request = new ListTasksRequest();
ListTasksResponse tasks = searchClient.tasks().list(request, RequestOptions.DEFAULT);
if (tasks.getTasks().stream().noneMatch(task -> task.getTaskId().toString().equals(reindexTask.getTask()))) {
log.info("Reindexing {} to {} task has completed, will now check if reindex was successful", indexName, tempIndexName);
reindexTaskCompleted = true;
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.info("Trouble sleeping while reindexing {} to {}: Exception {}. Retrying...", indexName, tempIndexName, e.toString());
}
}
if (!reindexTaskCompleted) {
throw new RuntimeException(String.format("Reindex from %s to %s failed-- task exceeded 60 minute limit", indexName, tempIndexName));
}
} catch (Exception e) {
log.info("Failed to reindex {} to {}: Exception {}", indexName, tempIndexName, e.toString());
searchClient.indices().delete(new DeleteIndexRequest().indices(tempIndexName), RequestOptions.DEFAULT);