Fix #12083 - Backoff retry when polling for pipeline service client status (#12105)

* Fix #12083

* Fix #12083

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Pere Miquel Brull 2023-06-26 06:52:21 +02:00 committed by GitHub
parent 7d1b123efe
commit e13a5de5ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 4 deletions

View File

@ -22,7 +22,7 @@ You can check the latest release [here](/releases/all-releases).
- We will be adding support for NoSQL DB entities and Services with APIs - We will be adding support for NoSQL DB entities and Services with APIs
- Support for Long Entity Names such as S3 paths - Support for Long Entity Names such as S3 paths
- Import/Export support at all entities - Import/Export support at all entities
- Tag Propgation using Import/Export - Tag Propagation using Import/Export
- Thumbs up & down to capture popularity of the Entities - Thumbs up & down to capture popularity of the Entities
{% /tile %} {% /tile %}

View File

@ -227,6 +227,10 @@
<groupId>org.flywaydb</groupId> <groupId>org.flywaydb</groupId>
<artifactId>flyway-mysql</artifactId> <artifactId>flyway-mysql</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>
<!-- Dependencies for secret store manager providers --> <!-- Dependencies for secret store manager providers -->

View File

@ -4,9 +4,13 @@ import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJob
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY; import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT; import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry; import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.openmetadata.sdk.PipelineServiceClient; import org.openmetadata.sdk.PipelineServiceClient;
@ -22,6 +26,9 @@ public class PipelineServiceStatusJob implements Job {
private static final String UNHEALTHY_TAG_NAME = "unhealthy"; private static final String UNHEALTHY_TAG_NAME = "unhealthy";
private static final String CLUSTER_TAG_NAME = "clusterName"; private static final String CLUSTER_TAG_NAME = "clusterName";
private static final Integer MAX_ATTEMPTS = 3;
private static final Integer BACKOFF_TIME_SECONDS = 5;
@Override @Override
public void execute(JobExecutionContext jobExecutionContext) { public void execute(JobExecutionContext jobExecutionContext) {
@ -39,11 +46,35 @@ public class PipelineServiceStatusJob implements Job {
} }
} }
private String getServiceStatus(PipelineServiceClient pipelineServiceClient) {
RetryConfig retryConfig =
RetryConfig.<String>custom()
.maxAttempts(MAX_ATTEMPTS)
.waitDuration(Duration.ofMillis(BACKOFF_TIME_SECONDS * 1_000L))
.retryOnResult(response -> !HEALTHY_STATUS.equals(response))
.failAfterMaxAttempts(false)
.build();
Retry retry = Retry.of("getServiceStatus", retryConfig);
Supplier<String> responseSupplier =
() -> {
try {
Response response = pipelineServiceClient.getServiceStatus();
Map<String, String> responseMap = (Map<String, String>) response.getEntity();
return responseMap.get(STATUS_KEY) == null ? "unhealthy" : responseMap.get(STATUS_KEY);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
return retry.executeSupplier(responseSupplier);
}
private void registerStatusMetric( private void registerStatusMetric(
PipelineServiceClient pipelineServiceClient, PrometheusMeterRegistry meterRegistry, String clusterName) { PipelineServiceClient pipelineServiceClient, PrometheusMeterRegistry meterRegistry, String clusterName) {
Response response = pipelineServiceClient.getServiceStatus(); String status = getServiceStatus(pipelineServiceClient);
Map<String, String> responseMap = (Map<String, String>) response.getEntity(); if (!HEALTHY_STATUS.equals(status)) {
if (responseMap.get(STATUS_KEY) == null || !HEALTHY_STATUS.equals(responseMap.get(STATUS_KEY))) {
publishUnhealthyCounter(meterRegistry, clusterName); publishUnhealthyCounter(meterRegistry, clusterName);
} }
} }

View File

@ -105,6 +105,7 @@
<commons-lang.version>2.6</commons-lang.version> <commons-lang.version>2.6</commons-lang.version>
<lombok.version>1.18.26</lombok.version> <lombok.version>1.18.26</lombok.version>
<tomcat-jdbc.version>10.1.7</tomcat-jdbc.version> <tomcat-jdbc.version>10.1.7</tomcat-jdbc.version>
<resilience4j.version>1.7.0</resilience4j.version>
<!-- We need ElasticSearch client to be compatible with both ElasticSearch and AWS OpenSearch <!-- We need ElasticSearch client to be compatible with both ElasticSearch and AWS OpenSearch
This compatibility broken in 7.14, so lets keep this version pinned to 7.13.x This compatibility broken in 7.14, so lets keep this version pinned to 7.13.x
--> -->
@ -458,6 +459,11 @@
<version>${org.junit.jupiter.version}</version> <version>${org.junit.jupiter.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.reflections</groupId> <groupId>org.reflections</groupId>
<artifactId>reflections</artifactId> <artifactId>reflections</artifactId>