Add SSL options to ElasticSearch client - Ingestion & Server (#1548)

* Add SSL options to ElasticSearch client - Ingestion & Server

* Add SSL options to ElasticSearch client - Ingestion & Server
This commit is contained in:
Sriharsha Chintalapani 2021-12-04 01:07:55 -08:00 committed by GitHub
parent 894f54523d
commit 970f38b69f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 23 deletions

View File

@ -16,6 +16,8 @@
package org.openmetadata.catalog; package org.openmetadata.catalog;
import lombok.Builder;
import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotEmpty;
public class ElasticSearchConfiguration { public class ElasticSearchConfiguration {
@ -30,6 +32,16 @@ public class ElasticSearchConfiguration {
private String password; private String password;
private String scheme;
private String truststorePath;
private String truststorePassword;
private Integer connectionTimeoutSecs = 5;
private Integer socketTimeoutSecs = 60;
public String getHost() { public String getHost() {
return host; return host;
} }
@ -62,13 +74,44 @@ public class ElasticSearchConfiguration {
this.password = password; this.password = password;
} }
public String getScheme() {
return scheme;
}
public void setScheme(String scheme) {
this.scheme = scheme;
}
public String getTruststorePath() { return truststorePath; }
public void setTruststorePath(String truststorePath) {
this.truststorePath = truststorePath;
}
public String getTruststorePassword() { return truststorePassword; }
public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}
public Integer getConnectionTimeoutSecs() { return connectionTimeoutSecs; }
public void setConnectionTimeoutSecs(Integer connectionTimeoutSecs) {
this.connectionTimeoutSecs = connectionTimeoutSecs;
}
public Integer getSocketTimeoutSecs() { return socketTimeoutSecs; }
public void setSocketTimeoutSecs(Integer socketTimeoutSecs) {
this.socketTimeoutSecs = socketTimeoutSecs;
}
@Override @Override
public String toString() { public String toString() {
return "ElasticSearchConfiguration{" + return "ElasticSearchConfiguration{" +
"host='" + host + '\'' + "host='" + host + '\'' +
", port=" + port + ", port=" + port +
", username='" + username + '\'' + ", username='" + username + '\'' +
", password='" + password + '\'' +
'}'; '}';
} }
} }

View File

@ -47,6 +47,7 @@ import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange; import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.ElasticSearchClientUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -81,16 +82,7 @@ public class ElasticSearchEventHandler implements EventHandler {
public void init(CatalogApplicationConfig config, Jdbi jdbi) { public void init(CatalogApplicationConfig config, Jdbi jdbi) {
ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration(); ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http")); this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
this.client = new RestHighLevelClient(restClientBuilder);
} }
public Void process(ContainerRequestContext requestContext, public Void process(ContainerRequestContext requestContext,

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.openmetadata.catalog.util.ElasticSearchClientUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -72,16 +73,7 @@ public class SearchResource {
private static final Logger LOG = LoggerFactory.getLogger(SearchResource.class); private static final Logger LOG = LoggerFactory.getLogger(SearchResource.class);
public SearchResource(ElasticSearchConfiguration esConfig) { public SearchResource(ElasticSearchConfiguration esConfig) {
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http")); this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
this.client = new RestHighLevelClient(restClientBuilder);
} }
@GET @GET

View File

@ -0,0 +1,83 @@
package org.openmetadata.catalog.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
public final class ElasticSearchClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClientUtils.class);
public static RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration esConfig) {
try {
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(),
esConfig.getScheme()));
if (StringUtils.isNotEmpty(esConfig.getUsername()) && StringUtils.isNotEmpty(esConfig.getUsername())) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(),
esConfig.getPassword()));
SSLContext sslContext = createSSLContext(esConfig);
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//httpAsyncClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true);
if (sslContext != null) {
httpAsyncClientBuilder.setSSLContext(sslContext);
}
return httpAsyncClientBuilder;
});
}
restClientBuilder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
throw new RuntimeException("Failed to create elastic search client ", e);
}
}
private static SSLContext createSSLContext(ElasticSearchConfiguration elasticSearchConfiguration)
throws KeyStoreException {
if (elasticSearchConfiguration.getScheme().equals("https")) {
if (elasticSearchConfiguration.getTruststorePath() != null) {
Path trustStorePath = Paths.get(elasticSearchConfiguration.getTruststorePath());
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(trustStorePath)) {
truststore.load(is, elasticSearchConfiguration.getTruststorePassword().toCharArray());
SSLContextBuilder sslBuilder = SSLContexts.custom()
.loadTrustMaterial(truststore, null);
return sslBuilder.build();
} catch (IOException | NoSuchAlgorithmException | CertificateException |
KeyStoreException | KeyManagementException e) {
throw new RuntimeException("Failed to crete SSLContext to for ElasticSearch Client", e);
}
}
}
return null;
}
}

View File

@ -137,6 +137,7 @@ authenticationConfiguration:
elasticsearch: elasticsearch:
host: localhost host: localhost
port: 9200 port: 9200
scheme: "http"
eventHandlerConfiguration: eventHandlerConfiguration:

View File

@ -118,6 +118,7 @@ database:
elasticsearch: elasticsearch:
host: localhost host: localhost
port: 9200 port: 9200
scheme: "http"
eventHandlerConfiguration: eventHandlerConfiguration:
eventHandlerClassNames: eventHandlerClassNames:

View File

@ -14,10 +14,12 @@
# limitations under the License. # limitations under the License.
import json import json
import logging import logging
import ssl
import time import time
from typing import List, Optional from typing import List, Optional
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.chart import Chart
@ -64,6 +66,11 @@ class ElasticSearchConfig(ConfigModel):
topic_index_name: str = "topic_search_index" topic_index_name: str = "topic_search_index"
dashboard_index_name: str = "dashboard_search_index" dashboard_index_name: str = "dashboard_search_index"
pipeline_index_name: str = "pipeline_search_index" pipeline_index_name: str = "pipeline_search_index"
scheme: str = "http"
use_ssl: bool = False
verify_certs: bool = False
timeout: int = 30
ca_certs: Optional[str] = None
class ElasticsearchSink(Sink): class ElasticsearchSink(Sink):
@ -95,12 +102,25 @@ class ElasticsearchSink(Sink):
http_auth = None http_auth = None
if self.config.es_username: if self.config.es_username:
http_auth = (self.config.es_username, self.config.es_password) http_auth = (self.config.es_username, self.config.es_password)
ssl_context = None
if self.config.scheme == "https" and not self.config.verify_certs:
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.elasticsearch_client = Elasticsearch( self.elasticsearch_client = Elasticsearch(
[ [
{"host": self.config.es_host, "port": self.config.es_port}, {"host": self.config.es_host, "port": self.config.es_port},
], ],
http_auth=http_auth, http_auth=http_auth,
scheme=self.config.scheme,
use_ssl=self.config.use_ssl,
verify_certs=self.config.verify_certs,
ssl_context=ssl_context,
ca_certs=self.config.ca_certs,
) )
if self.config.index_tables: if self.config.index_tables:
self._check_or_create_index( self._check_or_create_index(
self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING
@ -134,7 +154,9 @@ class ElasticsearchSink(Sink):
"properties": es_mapping_dict["mappings"]["properties"] "properties": es_mapping_dict["mappings"]["properties"]
} }
self.elasticsearch_client.indices.put_mapping( self.elasticsearch_client.indices.put_mapping(
index=index_name, body=json.dumps(es_mapping_update_dict) index=index_name,
body=json.dumps(es_mapping_update_dict),
request_timeout=self.config.timeout,
) )
else: else:
logger.warning( logger.warning(
@ -142,7 +164,9 @@ class ElasticsearchSink(Sink):
+ "The index doesn't exist for a newly created ES. It's OK on first run." + "The index doesn't exist for a newly created ES. It's OK on first run."
) )
# create new index with mapping # create new index with mapping
self.elasticsearch_client.indices.create(index=index_name, body=es_mapping) self.elasticsearch_client.indices.create(
index=index_name, body=es_mapping, request_timeout=self.config.timeout
)
def write_record(self, record: Record) -> None: def write_record(self, record: Record) -> None:
if isinstance(record, Table): if isinstance(record, Table):
@ -151,6 +175,7 @@ class ElasticsearchSink(Sink):
index=self.config.table_index_name, index=self.config.table_index_name,
id=str(table_doc.table_id), id=str(table_doc.table_id),
body=table_doc.json(), body=table_doc.json(),
request_timeout=self.config.timeout,
) )
if isinstance(record, Topic): if isinstance(record, Topic):
topic_doc = self._create_topic_es_doc(record) topic_doc = self._create_topic_es_doc(record)
@ -158,6 +183,7 @@ class ElasticsearchSink(Sink):
index=self.config.topic_index_name, index=self.config.topic_index_name,
id=str(topic_doc.topic_id), id=str(topic_doc.topic_id),
body=topic_doc.json(), body=topic_doc.json(),
request_timeout=self.config.timeout,
) )
if isinstance(record, Dashboard): if isinstance(record, Dashboard):
dashboard_doc = self._create_dashboard_es_doc(record) dashboard_doc = self._create_dashboard_es_doc(record)
@ -165,6 +191,7 @@ class ElasticsearchSink(Sink):
index=self.config.dashboard_index_name, index=self.config.dashboard_index_name,
id=str(dashboard_doc.dashboard_id), id=str(dashboard_doc.dashboard_id),
body=dashboard_doc.json(), body=dashboard_doc.json(),
request_timeout=self.config.timeout,
) )
if isinstance(record, Pipeline): if isinstance(record, Pipeline):
pipeline_doc = self._create_pipeline_es_doc(record) pipeline_doc = self._create_pipeline_es_doc(record)
@ -172,6 +199,7 @@ class ElasticsearchSink(Sink):
index=self.config.pipeline_index_name, index=self.config.pipeline_index_name,
id=str(pipeline_doc.pipeline_id), id=str(pipeline_doc.pipeline_id),
body=pipeline_doc.json(), body=pipeline_doc.json(),
request_timeout=self.config.timeout,
) )
if hasattr(record.name, "__root__"): if hasattr(record.name, "__root__"):