From aeba0e9afe20c4c2a0acc5be4573da5d9b831870 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Wed, 13 Oct 2021 18:53:47 -0700 Subject: [PATCH] Revert "revert "test(ElasticSearch): Retry ES requests" (#3385)" (#3392) This reverts commit b16b090f2bf78e0a751490310b95db3dfc142bbf. --- .github/workflows/build-and-test.yml | 2 +- build.gradle | 1 + metadata-io/build.gradle | 1 + .../metadata/ElasticSearchTestUtils.java | 50 +++++++++++++++---- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 6a491125a4..1d0c113277 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -29,7 +29,7 @@ jobs: with: python-version: "3.6" - name: Gradle build (and test) - run: ./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build + run: ./gradlew build -x :metadata-io:test -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build - uses: actions/upload-artifact@v2 if: always() with: diff --git a/build.gradle b/build.gradle index d270d68b12..60c04c60d3 100644 --- a/build.gradle +++ b/build.gradle @@ -107,6 +107,7 @@ project.ext.externalDependency = [ 'playPac4j': 'org.pac4j:play-pac4j_2.11:7.0.1', 'postgresql': 'org.postgresql:postgresql:42.2.14', 'reflections': 'org.reflections:reflections:0.9.11', + 'resilience4j': 'io.github.resilience4j:resilience4j-retry:1.7.1', 'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0', 'servletApi': 'javax.servlet:javax.servlet-api:3.1.0', 'shiroCore': 'org.apache.shiro:shiro-core:1.7.1', diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 8e44c4c9a7..a058370560 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -38,6 +38,7 @@ dependencies { testCompile externalDependency.mockito testCompile externalDependency.mockitoInline testCompile externalDependency.iStackCommons + testCompile externalDependency.resilience4j testCompile externalDependency.testContainers testCompile externalDependency.testContainersJunit testCompile externalDependency.testContainersElasticsearch diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java index 18779204f3..c8db02ab3e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java @@ -1,5 +1,8 @@ package com.linkedin.metadata; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -14,12 +17,40 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.testng.TestException; -import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.UUID; import java.util.concurrent.TimeUnit; public class ElasticSearchTestUtils { + // request options for all requests + private static final RequestOptions OPTIONS = RequestOptions.DEFAULT; + + // retry logic for ES requests + private static final Retry RETRY = Retry.of("ElasticSearchTestUtils", RetryConfig.custom() + .retryExceptions(SocketTimeoutException.class, ElasticsearchStatusException.class) + .failAfterMaxAttempts(true) + .maxAttempts(3) + .build() + ); + + // allow for Supplier that throw exceptions + private interface ThrowingSupplier { + T get() throws E; + } + + // We are retrying requests, otherwise concurrency tests will see exceptions like these: + // java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] + private static T retry(ThrowingSupplier func) { + return RETRY.executeSupplier(() -> { + try { + return func.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + private ElasticSearchTestUtils() { } @@ -37,13 +68,13 @@ public class ElasticSearchTestUtils { waitForSyncFlag(searchClient, syncFlag, indexName, true); // flush changes for all indices in ES to disk - FlushResponse fResponse = searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT); + FlushResponse fResponse = retry(() -> searchClient.indices().flush(new FlushRequest(), OPTIONS)); if (fResponse.getFailedShards() > 0) { throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards"); } // wait for all indices to be refreshed - RefreshResponse rResponse = searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT); + RefreshResponse rResponse = retry(() -> searchClient.indices().refresh(new RefreshRequest(), OPTIONS)); if (rResponse.getFailedShards() > 0) { throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards"); } @@ -53,26 +84,27 @@ public class ElasticSearchTestUtils { waitForSyncFlag(searchClient, syncFlag, indexName, false); } - private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException { + private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) { String document = "{ }"; final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON); final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON) .detectNoop(false) + .retryOnConflict(3) .upsert(indexRequest); - searchClient.update(updateRequest, RequestOptions.DEFAULT); + retry(() -> searchClient.update(updateRequest, OPTIONS)); } - private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException { + private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) { final DeleteRequest deleteRequest = new DeleteRequest(indexName).id(docId); - searchClient.delete(deleteRequest, RequestOptions.DEFAULT); + retry(() -> searchClient.delete(deleteRequest, OPTIONS)); } private static void waitForSyncFlag(RestHighLevelClient searchClient, String docId, String indexName, boolean toExist) - throws IOException, InterruptedException { + throws InterruptedException { GetRequest request = new GetRequest(indexName).id(docId); long timeout = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS); while (System.currentTimeMillis() < timeout) { - GetResponse response = searchClient.get(request, RequestOptions.DEFAULT); + GetResponse response = retry(() -> searchClient.get(request, OPTIONS)); if (response.isExists() == toExist) { return; }