Revert "revert "test(ElasticSearch): Retry ES requests" (#3385)" (#3392)

This reverts commit b16b090f2bf78e0a751490310b95db3dfc142bbf.
This commit is contained in:
Gabe Lyons 2021-10-13 18:53:47 -07:00 committed by GitHub
parent b16b090f2b
commit aeba0e9afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 10 deletions

View File

@ -29,7 +29,7 @@ jobs:
with:
python-version: "3.6"
- name: Gradle build (and test)
run: ./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build
run: ./gradlew build -x :metadata-io:test -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build
- uses: actions/upload-artifact@v2
if: always()
with:

View File

@ -107,6 +107,7 @@ project.ext.externalDependency = [
'playPac4j': 'org.pac4j:play-pac4j_2.11:7.0.1',
'postgresql': 'org.postgresql:postgresql:42.2.14',
'reflections': 'org.reflections:reflections:0.9.11',
'resilience4j': 'io.github.resilience4j:resilience4j-retry:1.7.1',
'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0',
'servletApi': 'javax.servlet:javax.servlet-api:3.1.0',
'shiroCore': 'org.apache.shiro:shiro-core:1.7.1',

View File

@ -38,6 +38,7 @@ dependencies {
testCompile externalDependency.mockito
testCompile externalDependency.mockitoInline
testCompile externalDependency.iStackCommons
testCompile externalDependency.resilience4j
testCompile externalDependency.testContainers
testCompile externalDependency.testContainersJunit
testCompile externalDependency.testContainersElasticsearch

View File

@ -1,5 +1,8 @@
package com.linkedin.metadata;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@ -14,12 +17,40 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.testng.TestException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class ElasticSearchTestUtils {
// request options for all requests
private static final RequestOptions OPTIONS = RequestOptions.DEFAULT;
// retry logic for ES requests
private static final Retry RETRY = Retry.of("ElasticSearchTestUtils", RetryConfig.custom()
.retryExceptions(SocketTimeoutException.class, ElasticsearchStatusException.class)
.failAfterMaxAttempts(true)
.maxAttempts(3)
.build()
);
// allow for Supplier<T> that throw exceptions
private interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
// We are retrying requests, otherwise concurrency tests will see exceptions like these:
// java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE]
private static <T> T retry(ThrowingSupplier<T, Exception> func) {
return RETRY.executeSupplier(() -> {
try {
return func.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private ElasticSearchTestUtils() {
}
@ -37,13 +68,13 @@ public class ElasticSearchTestUtils {
waitForSyncFlag(searchClient, syncFlag, indexName, true);
// flush changes for all indices in ES to disk
FlushResponse fResponse = searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT);
FlushResponse fResponse = retry(() -> searchClient.indices().flush(new FlushRequest(), OPTIONS));
if (fResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards");
}
// wait for all indices to be refreshed
RefreshResponse rResponse = searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT);
RefreshResponse rResponse = retry(() -> searchClient.indices().refresh(new RefreshRequest(), OPTIONS));
if (rResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards");
}
@ -53,26 +84,27 @@ public class ElasticSearchTestUtils {
waitForSyncFlag(searchClient, syncFlag, indexName, false);
}
private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException {
private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) {
String document = "{ }";
final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON)
.detectNoop(false)
.retryOnConflict(3)
.upsert(indexRequest);
searchClient.update(updateRequest, RequestOptions.DEFAULT);
retry(() -> searchClient.update(updateRequest, OPTIONS));
}
private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException {
private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) {
final DeleteRequest deleteRequest = new DeleteRequest(indexName).id(docId);
searchClient.delete(deleteRequest, RequestOptions.DEFAULT);
retry(() -> searchClient.delete(deleteRequest, OPTIONS));
}
private static void waitForSyncFlag(RestHighLevelClient searchClient, String docId, String indexName, boolean toExist)
throws IOException, InterruptedException {
throws InterruptedException {
GetRequest request = new GetRequest(indexName).id(docId);
long timeout = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
while (System.currentTimeMillis() < timeout) {
GetResponse response = searchClient.get(request, RequestOptions.DEFAULT);
GetResponse response = retry(() -> searchClient.get(request, OPTIONS));
if (response.isExists() == toExist) {
return;
}