diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ElasticSearchConfiguration.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ElasticSearchConfiguration.java index 85eda25df78..cd7927c5e14 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ElasticSearchConfiguration.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ElasticSearchConfiguration.java @@ -16,6 +16,8 @@ package org.openmetadata.catalog; +import lombok.Builder; + import javax.validation.constraints.NotEmpty; public class ElasticSearchConfiguration { @@ -30,6 +32,16 @@ public class ElasticSearchConfiguration { private String password; + private String scheme; + + private String truststorePath; + + private String truststorePassword; + + private Integer connectionTimeoutSecs = 5; + + private Integer socketTimeoutSecs = 60; + public String getHost() { return host; } @@ -62,13 +74,44 @@ public class ElasticSearchConfiguration { this.password = password; } + public String getScheme() { + return scheme; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + public String getTruststorePath() { return truststorePath; } + + public void setTruststorePath(String truststorePath) { + this.truststorePath = truststorePath; + } + + public String getTruststorePassword() { return truststorePassword; } + + public void setTruststorePassword(String truststorePassword) { + this.truststorePassword = truststorePassword; + } + + public Integer getConnectionTimeoutSecs() { return connectionTimeoutSecs; } + + public void setConnectionTimeoutSecs(Integer connectionTimeoutSecs) { + this.connectionTimeoutSecs = connectionTimeoutSecs; + } + + public Integer getSocketTimeoutSecs() { return socketTimeoutSecs; } + + public void setSocketTimeoutSecs(Integer socketTimeoutSecs) { + this.socketTimeoutSecs = socketTimeoutSecs; + } + @Override public String toString() { return "ElasticSearchConfiguration{" + "host='" + host + '\'' + ", port=" + port + ", username='" + username + '\'' + - ", password='" + password + '\'' + '}'; } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java index 691c65d9456..b738b3aab2a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java @@ -47,6 +47,7 @@ import org.openmetadata.catalog.type.Column; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.FieldChange; import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.util.ElasticSearchClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,16 +82,7 @@ public class ElasticSearchEventHandler implements EventHandler { public void init(CatalogApplicationConfig config, Jdbi jdbi) { ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration(); - RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http")); - if(StringUtils.isNotEmpty(esConfig.getUsername())){ - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword())); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - return httpAsyncClientBuilder; - }); - } - this.client = new RestHighLevelClient(restClientBuilder); + this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig); } public Void process(ContainerRequestContext requestContext, diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index 947993f7e66..eca630fcba1 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.openmetadata.catalog.util.ElasticSearchClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,16 +73,7 @@ public class SearchResource { private static final Logger LOG = LoggerFactory.getLogger(SearchResource.class); public SearchResource(ElasticSearchConfiguration esConfig) { - RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http")); - if(StringUtils.isNotEmpty(esConfig.getUsername())){ - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword())); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - return httpAsyncClientBuilder; - }); - } - this.client = new RestHighLevelClient(restClientBuilder); + this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig); } @GET diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ElasticSearchClientUtils.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ElasticSearchClientUtils.java new file mode 100644 index 00000000000..19f4a84cd71 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/ElasticSearchClientUtils.java @@ -0,0 +1,83 @@ +package org.openmetadata.catalog.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.openmetadata.catalog.ElasticSearchConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; + +public final class ElasticSearchClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClientUtils.class); + + public static RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration esConfig) { + try { + RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), + esConfig.getScheme())); + + if (StringUtils.isNotEmpty(esConfig.getUsername()) && StringUtils.isNotEmpty(esConfig.getUsername())) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), + esConfig.getPassword())); + SSLContext sslContext = createSSLContext(esConfig); + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + //httpAsyncClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true); + if (sslContext != null) { + httpAsyncClientBuilder.setSSLContext(sslContext); + } + return httpAsyncClientBuilder; + }); + } + restClientBuilder.setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder + .setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000) + .setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000)); + return new RestHighLevelClient(restClientBuilder); + } catch (Exception e) { + throw new RuntimeException("Failed to create elastic search client ", e); + } + } + + private static SSLContext createSSLContext(ElasticSearchConfiguration elasticSearchConfiguration) + throws KeyStoreException { + if (elasticSearchConfiguration.getScheme().equals("https")) { + if (elasticSearchConfiguration.getTruststorePath() != null) { + Path trustStorePath = Paths.get(elasticSearchConfiguration.getTruststorePath()); + KeyStore truststore = KeyStore.getInstance("jks"); + try (InputStream is = Files.newInputStream(trustStorePath)) { + truststore.load(is, elasticSearchConfiguration.getTruststorePassword().toCharArray()); + SSLContextBuilder sslBuilder = SSLContexts.custom() + .loadTrustMaterial(truststore, null); + return sslBuilder.build(); + } catch (IOException | NoSuchAlgorithmException | CertificateException | + KeyStoreException | KeyManagementException e) { + throw new RuntimeException("Failed to crete SSLContext to for ElasticSearch Client", e); + } + } + } + return null; + } + +} diff --git a/conf/openmetadata-security.yaml b/conf/openmetadata-security.yaml index 48cbbd375dd..fb00d95295d 100644 --- a/conf/openmetadata-security.yaml +++ b/conf/openmetadata-security.yaml @@ -137,6 +137,7 @@ authenticationConfiguration: elasticsearch: host: localhost port: 9200 + scheme: "http" eventHandlerConfiguration: diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index f767642bf97..e2d71ebf406 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -118,6 +118,7 @@ database: elasticsearch: host: localhost port: 9200 + scheme: "http" eventHandlerConfiguration: eventHandlerClassNames: diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 86e612b76af..0c57db27697 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -14,10 +14,12 @@ # limitations under the License. import json import logging +import ssl import time from typing import List, Optional from elasticsearch import Elasticsearch +from elasticsearch.connection import create_ssl_context from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.chart import Chart @@ -64,6 +66,11 @@ class ElasticSearchConfig(ConfigModel): topic_index_name: str = "topic_search_index" dashboard_index_name: str = "dashboard_search_index" pipeline_index_name: str = "pipeline_search_index" + scheme: str = "http" + use_ssl: bool = False + verify_certs: bool = False + timeout: int = 30 + ca_certs: Optional[str] = None class ElasticsearchSink(Sink): @@ -95,12 +102,25 @@ class ElasticsearchSink(Sink): http_auth = None if self.config.es_username: http_auth = (self.config.es_username, self.config.es_password) + + ssl_context = None + if self.config.scheme == "https" and not self.config.verify_certs: + ssl_context = create_ssl_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + self.elasticsearch_client = Elasticsearch( [ {"host": self.config.es_host, "port": self.config.es_port}, ], http_auth=http_auth, + scheme=self.config.scheme, + use_ssl=self.config.use_ssl, + verify_certs=self.config.verify_certs, + ssl_context=ssl_context, + ca_certs=self.config.ca_certs, ) + if self.config.index_tables: self._check_or_create_index( self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING @@ -134,7 +154,9 @@ class ElasticsearchSink(Sink): "properties": es_mapping_dict["mappings"]["properties"] } self.elasticsearch_client.indices.put_mapping( - index=index_name, body=json.dumps(es_mapping_update_dict) + index=index_name, + body=json.dumps(es_mapping_update_dict), + request_timeout=self.config.timeout, ) else: logger.warning( @@ -142,7 +164,9 @@ class ElasticsearchSink(Sink): + "The index doesn't exist for a newly created ES. It's OK on first run." ) # create new index with mapping - self.elasticsearch_client.indices.create(index=index_name, body=es_mapping) + self.elasticsearch_client.indices.create( + index=index_name, body=es_mapping, request_timeout=self.config.timeout + ) def write_record(self, record: Record) -> None: if isinstance(record, Table): @@ -151,6 +175,7 @@ class ElasticsearchSink(Sink): index=self.config.table_index_name, id=str(table_doc.table_id), body=table_doc.json(), + request_timeout=self.config.timeout, ) if isinstance(record, Topic): topic_doc = self._create_topic_es_doc(record) @@ -158,6 +183,7 @@ class ElasticsearchSink(Sink): index=self.config.topic_index_name, id=str(topic_doc.topic_id), body=topic_doc.json(), + request_timeout=self.config.timeout, ) if isinstance(record, Dashboard): dashboard_doc = self._create_dashboard_es_doc(record) @@ -165,6 +191,7 @@ class ElasticsearchSink(Sink): index=self.config.dashboard_index_name, id=str(dashboard_doc.dashboard_id), body=dashboard_doc.json(), + request_timeout=self.config.timeout, ) if isinstance(record, Pipeline): pipeline_doc = self._create_pipeline_es_doc(record) @@ -172,6 +199,7 @@ class ElasticsearchSink(Sink): index=self.config.pipeline_index_name, id=str(pipeline_doc.pipeline_id), body=pipeline_doc.json(), + request_timeout=self.config.timeout, ) if hasattr(record.name, "__root__"):