mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-30 03:18:24 +00:00
feat(gms): add elasticsearch SSL support (#2189)
Co-authored-by: thomas.larsson <thomas.larsson@klarna.com>
This commit is contained in:
parent
973768fc5a
commit
7043138797
11
docker/datahub-gms/env/docker.env
vendored
11
docker/datahub-gms/env/docker.env
vendored
@ -12,3 +12,14 @@ NEO4J_HOST=http://neo4j:7474
|
||||
NEO4J_URI=bolt://neo4j
|
||||
NEO4J_USERNAME=neo4j
|
||||
NEO4J_PASSWORD=datahub
|
||||
|
||||
# Uncomment and set these to support SSL connection to Elasticsearch
|
||||
# ELASTICSEARCH_USE_SSL=true
|
||||
# ELASTICSEARCH_SSL_PROTOCOL=TLSv1.2
|
||||
# ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL=
|
||||
# ELASTICSEARCH_SSL_TRUSTSTORE_FILE=
|
||||
# ELASTICSEARCH_SSL_TRUSTSTORE_TYPE=
|
||||
# ELASTICSEARCH_SSL_TRUSTSTORE_PASSWORD=
|
||||
# ELASTICSEARCH_SSL_KEYSTORE_FILE=
|
||||
# ELASTICSEARCH_SSL_KEYSTORE_TYPE=
|
||||
# ELASTICSEARCH_SSL_KEYSTORE_PASSWORD=
|
||||
|
||||
@ -5,6 +5,7 @@ dependencies {
|
||||
compile project(':metadata-dao-impl:kafka-producer')
|
||||
|
||||
compile externalDependency.elasticSearchRest
|
||||
compile externalDependency.httpClient
|
||||
compile externalDependency.gson
|
||||
compile externalDependency.kafkaClients
|
||||
compile externalDependency.kafkaAvroSerde
|
||||
|
||||
@ -0,0 +1,85 @@
|
||||
package com.linkedin.gms.factory.common;
|
||||
|
||||
import org.apache.http.ssl.SSLContextBuilder;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.cert.CertificateException;
|
||||
|
||||
|
||||
@Configuration
|
||||
public class ElasticsearchSSLContextFactory {
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_PROTOCOL:#{null}}")
|
||||
private String sslProtocol;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL:#{null}}")
|
||||
private String sslSecureRandomImplementation;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_TRUSTSTORE_FILE:#{null}}")
|
||||
private String sslTrustStoreFile;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_TRUSTSTORE_TYPE:#{null}}")
|
||||
private String sslTrustStoreType;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_TRUSTSTORE_PASSWORD:#{null}}")
|
||||
private String sslTrustStorePassword;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_KEYSTORE_FILE:#{null}}")
|
||||
private String sslKeyStoreFile;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_KEYSTORE_TYPE:#{null}}")
|
||||
private String sslKeyStoreType;
|
||||
|
||||
@Value("${ELASTICSEARCH_SSL_KEYSTORE_PASSWORD:#{null}}")
|
||||
private String sslKeyStorePassword;
|
||||
|
||||
@Bean(name = "elasticSearchSSLContext")
|
||||
public SSLContext createInstance() {
|
||||
final SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
|
||||
if (sslProtocol != null) {
|
||||
sslContextBuilder.useProtocol(sslProtocol);
|
||||
}
|
||||
|
||||
if (sslTrustStoreFile != null && sslTrustStoreType != null && sslTrustStorePassword != null) {
|
||||
loadKeyStore(sslContextBuilder, sslTrustStoreFile, sslTrustStoreType, sslTrustStorePassword);
|
||||
}
|
||||
|
||||
if (sslKeyStoreFile != null && sslKeyStoreType != null && sslKeyStorePassword != null) {
|
||||
loadKeyStore(sslContextBuilder, sslKeyStoreFile, sslKeyStoreType, sslKeyStorePassword);
|
||||
}
|
||||
|
||||
final SSLContext sslContext;
|
||||
try {
|
||||
if (sslSecureRandomImplementation != null) {
|
||||
sslContextBuilder.setSecureRandom(SecureRandom.getInstance(sslSecureRandomImplementation));
|
||||
}
|
||||
sslContext = sslContextBuilder.build();
|
||||
} catch (NoSuchAlgorithmException | KeyManagementException e) {
|
||||
throw new RuntimeException("Failed to build SSL Context", e);
|
||||
}
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
private void loadKeyStore(@Nonnull SSLContextBuilder sslContextBuilder, @Nonnull String path,
|
||||
@Nonnull String type, @Nonnull String password) {
|
||||
try (InputStream identityFile = new FileInputStream(path)) {
|
||||
final KeyStore keystore = KeyStore.getInstance(type);
|
||||
keystore.load(identityFile, password.toCharArray());
|
||||
sslContextBuilder.loadTrustMaterial(keystore, null);
|
||||
} catch (IOException | CertificateException | NoSuchAlgorithmException | KeyStoreException e) {
|
||||
throw new RuntimeException("Failed to load key store: " + path, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,13 +1,16 @@
|
||||
package com.linkedin.gms.factory.common;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -18,27 +21,34 @@ import org.springframework.context.annotation.Configuration;
|
||||
public class RestHighLevelClientFactory {
|
||||
|
||||
@Value("${ELASTICSEARCH_HOST:localhost}")
|
||||
private String elasticSearchHost;
|
||||
private String host;
|
||||
|
||||
@Value("${ELASTICSEARCH_PORT:9200}")
|
||||
private Integer elasticSearchPort;
|
||||
private Integer port;
|
||||
|
||||
@Value("${ELASTICSEARCH_THREAD_COUNT:1}")
|
||||
private Integer elasticSearchThreadCount;
|
||||
private Integer threadCount;
|
||||
|
||||
@Value("${ELASTICSEARCH_CONNECTION_REQUEST_TIMEOUT:0}")
|
||||
private Integer elasticSearchConectionRequestTimeout;
|
||||
private Integer connectionRequestTimeout;
|
||||
|
||||
@Value("${ELASTICSEARCH_USE_SSL:false}")
|
||||
private boolean useSSL;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("elasticSearchSSLContext")
|
||||
private SSLContext sslContext;
|
||||
|
||||
@Bean(name = "elasticSearchRestHighLevelClient")
|
||||
@Nonnull
|
||||
protected RestHighLevelClient createInstance() {
|
||||
try {
|
||||
RestClient restClient = loadRestHttpClient(
|
||||
elasticSearchHost,
|
||||
elasticSearchPort,
|
||||
elasticSearchThreadCount,
|
||||
elasticSearchConectionRequestTimeout
|
||||
);
|
||||
RestClient restClient;
|
||||
if (useSSL) {
|
||||
restClient = loadRestHttpsClient(host, port, threadCount, connectionRequestTimeout, sslContext);
|
||||
} else {
|
||||
restClient = loadRestHttpClient(host, port, threadCount, connectionRequestTimeout);
|
||||
}
|
||||
|
||||
return new RestHighLevelClient(restClient);
|
||||
} catch (Exception e) {
|
||||
@ -48,7 +58,7 @@ public class RestHighLevelClientFactory {
|
||||
|
||||
@Nonnull
|
||||
private static RestClient loadRestHttpClient(@Nonnull String host, int port, int threadCount,
|
||||
int connectionRequestTimeout) throws Exception {
|
||||
int connectionRequestTimeout) {
|
||||
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
|
||||
.setHttpClientConfigCallback(httpAsyncClientBuilder ->
|
||||
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
|
||||
@ -59,5 +69,20 @@ public class RestHighLevelClientFactory {
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static RestClient loadRestHttpsClient(@Nonnull String host, int port, int threadCount,
|
||||
int connectionRequestTimeout, @Nonnull SSLContext sslContext) {
|
||||
|
||||
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https"))
|
||||
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setSSLContext(sslContext)
|
||||
.setSSLHostnameVerifier(new NoopHostnameVerifier())
|
||||
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
|
||||
|
||||
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.
|
||||
setConnectionRequestTimeout(connectionRequestTimeout));
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user