mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-27 18:45:50 +00:00
fix(elasticsearch): build in resilience against IO exceptions on httpclient (#6680)
* fix(elasticsearch): build in resilience against IO exceptions on http client
This commit is contained in:
parent
b7a2ce5c00
commit
f85fd157e9
@ -2,14 +2,28 @@ package com.linkedin.gms.factory.common;
|
|||||||
|
|
||||||
import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor;
|
import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor;
|
||||||
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
|
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
|
||||||
|
import java.io.IOException;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
import javax.net.ssl.HostnameVerifier;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.http.HttpRequestInterceptor;
|
import org.apache.http.HttpRequestInterceptor;
|
||||||
|
import org.apache.http.config.RegistryBuilder;
|
||||||
|
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
|
||||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||||
|
import org.apache.http.conn.util.PublicSuffixMatcherLoader;
|
||||||
|
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
|
||||||
|
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
|
||||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||||
|
import org.apache.http.nio.conn.NHttpClientConnectionManager;
|
||||||
|
import org.apache.http.nio.conn.NoopIOSessionStrategy;
|
||||||
|
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
|
||||||
|
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
|
||||||
|
import org.apache.http.nio.reactor.IOReactorException;
|
||||||
|
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
|
||||||
|
import org.apache.http.ssl.SSLContexts;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.client.RestClientBuilder;
|
import org.elasticsearch.client.RestClientBuilder;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
@ -70,25 +84,37 @@ public class RestHighLevelClientFactory {
|
|||||||
|
|
||||||
@Bean(name = "elasticSearchRestHighLevelClient")
|
@Bean(name = "elasticSearchRestHighLevelClient")
|
||||||
@Nonnull
|
@Nonnull
|
||||||
protected RestHighLevelClient createInstance() {
|
public RestHighLevelClient createInstance(RestClientBuilder restClientBuilder) {
|
||||||
RestClientBuilder restClientBuilder;
|
|
||||||
if (useSSL) {
|
|
||||||
restClientBuilder = loadRestHttpsClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, sslContext, username,
|
|
||||||
password, opensearchUseAwsIamAuth, region);
|
|
||||||
} else {
|
|
||||||
restClientBuilder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, username,
|
|
||||||
password, opensearchUseAwsIamAuth, region);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new RestHighLevelClient(restClientBuilder);
|
return new RestHighLevelClient(restClientBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RestClientBuilder loadRestClient() {
|
||||||
|
final RestClientBuilder builder = createBuilder(useSSL ? "https" : "http");
|
||||||
|
|
||||||
|
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
|
||||||
|
if (useSSL) {
|
||||||
|
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
httpAsyncClientBuilder.setConnectionManager(createConnectionManager());
|
||||||
|
} catch (IOReactorException e) {
|
||||||
|
throw new IllegalStateException("Unable to start ElasticSearch client. Please verify connection configuration.");
|
||||||
|
}
|
||||||
|
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());
|
||||||
|
|
||||||
|
setCredentials(httpAsyncClientBuilder);
|
||||||
|
|
||||||
|
return httpAsyncClientBuilder;
|
||||||
|
});
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
|
private RestClientBuilder createBuilder(String scheme) {
|
||||||
int connectionRequestTimeout) {
|
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme));
|
||||||
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
|
|
||||||
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
|
|
||||||
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
|
|
||||||
|
|
||||||
if (!StringUtils.isEmpty(pathPrefix)) {
|
if (!StringUtils.isEmpty(pathPrefix)) {
|
||||||
builder.setPathPrefix(pathPrefix);
|
builder.setPathPrefix(pathPrefix);
|
||||||
@ -100,76 +126,62 @@ public class RestHighLevelClientFactory {
|
|||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
/**
|
||||||
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
|
* Needed to override ExceptionHandler behavior for cases where IO error would have put client in unrecoverable state
|
||||||
int connectionRequestTimeout, String username, String password, boolean opensearchUseAwsIamAuth, String region) {
|
* We don't utilize system properties in the client builder, so setting defaults pulled from
|
||||||
RestClientBuilder builder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout);
|
* {@link HttpAsyncClientBuilder#build()}.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private NHttpClientConnectionManager createConnectionManager() throws IOReactorException {
|
||||||
|
SSLContext sslContext = SSLContexts.createDefault();
|
||||||
|
HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier(PublicSuffixMatcherLoader.getDefault());
|
||||||
|
SchemeIOSessionStrategy sslStrategy =
|
||||||
|
new SSLIOSessionStrategy(sslContext, null, null, hostnameVerifier);
|
||||||
|
|
||||||
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
|
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(threadCount).build();
|
||||||
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
|
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
|
||||||
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());
|
IOReactorExceptionHandler ioReactorExceptionHandler = new IOReactorExceptionHandler() {
|
||||||
|
@Override
|
||||||
if (username != null && password != null) {
|
public boolean handle(IOException ex) {
|
||||||
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
log.error("IO Exception caught during ElasticSearch connection.", ex);
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
return true;
|
||||||
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
||||||
}
|
|
||||||
if (opensearchUseAwsIamAuth) {
|
|
||||||
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
|
|
||||||
httpAsyncClientBuilder.addInterceptorLast(interceptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
return httpAsyncClientBuilder;
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
return builder;
|
@Override
|
||||||
|
public boolean handle(RuntimeException ex) {
|
||||||
|
log.error("Runtime Exception caught during ElasticSearch connection.", ex);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ioReactor.setExceptionHandler(ioReactorExceptionHandler);
|
||||||
|
|
||||||
|
return new PoolingNHttpClientConnectionManager(ioReactor,
|
||||||
|
RegistryBuilder.<SchemeIOSessionStrategy>create()
|
||||||
|
.register("http", NoopIOSessionStrategy.INSTANCE)
|
||||||
|
.register("https", sslStrategy)
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
private void setCredentials(HttpAsyncClientBuilder httpAsyncClientBuilder) {
|
||||||
private static RestClientBuilder loadRestHttpsClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
|
if (username != null && password != null) {
|
||||||
int connectionRequestTimeout, @Nonnull SSLContext sslContext, String username, String password,
|
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
boolean opensearchUseAwsIamAuth, String region) {
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||||
|
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https"));
|
}
|
||||||
|
if (opensearchUseAwsIamAuth) {
|
||||||
if (!StringUtils.isEmpty(pathPrefix)) {
|
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
|
||||||
builder.setPathPrefix(pathPrefix);
|
httpAsyncClientBuilder.addInterceptorLast(interceptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
|
|
||||||
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
|
|
||||||
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
|
|
||||||
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());
|
|
||||||
|
|
||||||
if (username != null && password != null) {
|
|
||||||
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
|
||||||
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
||||||
} else if (opensearchUseAwsIamAuth) {
|
|
||||||
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
|
|
||||||
httpAsyncClientBuilder.addInterceptorLast(interceptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
return httpAsyncClientBuilder;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
builder.setRequestConfigCallback(
|
|
||||||
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout));
|
|
||||||
|
|
||||||
return builder;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {
|
private HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {
|
||||||
|
|
||||||
if (region == null) {
|
if (region == null) {
|
||||||
throw new NullPointerException("Region must not be null when opensearchUseAwsIamAuth is enabled");
|
throw new IllegalArgumentException("Region must not be null when opensearchUseAwsIamAuth is enabled");
|
||||||
}
|
}
|
||||||
Aws4Signer signer = Aws4Signer.create();
|
Aws4Signer signer = Aws4Signer.create();
|
||||||
// Uses default AWS credentials
|
// Uses default AWS credentials
|
||||||
HttpRequestInterceptor interceptor = new AwsRequestSigningApacheInterceptor("es", signer,
|
return new AwsRequestSigningApacheInterceptor("es", signer,
|
||||||
DefaultCredentialsProvider.create(), region);
|
DefaultCredentialsProvider.create(), region);
|
||||||
return interceptor;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user