Add SSL options to ElasticSearch client - Ingestion & Server (#1548) (#1557)

* Add SSL options to ElasticSearch client - Ingestion & Server (#1548)

* Add SSL options to ElasticSearch client - Ingestion & Server

* Add SSL options to ElasticSearch client - Ingestion & Server

* Issue-1556: Support for ElasticSearch SSL , Bootstrap command create and delete, ingestion updates for change descriptions

* Issue-1556: Fix checkstyle issues

* Issue-1556: Fix checkstyle issues
This commit is contained in:
Sriharsha Chintalapani 2021-12-04 18:54:36 -08:00 committed by GitHub
parent ff2e92a2f6
commit 2d348e9704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 436 additions and 160 deletions

View File

@ -55,6 +55,9 @@ USAGE: $0 [create|migrate|info|validate|drop|drop-create|repair|check-connection
validate : Checks if the all the migrations haven been applied on the target database
drop : Drops all the tables in the target database
drop-create : Drops and recreates all the tables in the target database.
es-drop : Drops the indexes in ElasticSearch
es-create : Creates the indexes in ElasticSearch
drop-create-all : Drops and recreates all the tables in the database. Drops and creates all the indexes in ElasticSearch.
repair : Repairs the DATABASE_CHANGE_LOG table which is used to track all the migrations on the target database.
This involves removing entries for the failed migrations and update the checksum of migrations already applied on the target databsase.
check-connection : Checks if a connection can be sucessfully obtained for the target database
@ -71,12 +74,15 @@ fi
opt="$1"
case "${opt}" in
create | drop | migrate | info | validate | repair | check-connection )
create | drop | migrate | info | validate | repair | check-connection | es-drop | es-create )
execute "${opt}"
;;
drop-create )
execute "drop" && execute "create"
;;
drop-create-all )
execute "drop" && execute "create" && execute "es-drop" && execute "es-create"
;;
*)
printUsage
exit 1

View File

@ -13,59 +13,108 @@
package org.openmetadata.catalog;
import javax.validation.constraints.NotEmpty;
public class ElasticSearchConfiguration {
@NotEmpty
private String host;
@NotEmpty
private String host;
@NotEmpty
private Integer port;
@NotEmpty
private Integer port;
private String username;
private String username;
private String password;
private String password;
private String scheme;
public String getHost() {
return host;
}
private String truststorePath;
public void setHost(String host) {
this.host = host;
}
private String truststorePassword;
public Integer getPort() {
return port;
}
private Integer connectionTimeoutSecs = 5;
public void setPort(Integer port) {
this.port = port;
}
private Integer socketTimeoutSecs = 60;
public String getUsername() {
return username;
}
public String getHost() {
return host;
}
public void setUsername(String username) {
this.username = username;
}
public void setHost(String host) {
this.host = host;
}
public String getPassword() {
return password;
}
public Integer getPort() {
return port;
}
public void setPassword(String password) {
this.password = password;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public String toString() {
return "ElasticSearchConfiguration{" +
"host='" + host + '\'' +
", port=" + port +
", username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getScheme() {
return scheme;
}
public void setScheme(String scheme) {
this.scheme = scheme;
}
public String getTruststorePath() {
return truststorePath;
}
public void setTruststorePath(String truststorePath) {
this.truststorePath = truststorePath;
}
public String getTruststorePassword() {
return truststorePassword;
}
public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}
public Integer getConnectionTimeoutSecs() {
return connectionTimeoutSecs;
}
public void setConnectionTimeoutSecs(Integer connectionTimeoutSecs) {
this.connectionTimeoutSecs = connectionTimeoutSecs;
}
public Integer getSocketTimeoutSecs() {
return socketTimeoutSecs;
}
public void setSocketTimeoutSecs(Integer socketTimeoutSecs) {
this.socketTimeoutSecs = socketTimeoutSecs;
}
@Override
public String toString() {
return "ElasticSearchConfiguration{" +
"host='" + host + '\'' +
", port=" + port +
", username='" + username + '\'' +
'}';
}
}

View File

@ -16,18 +16,10 @@
package org.openmetadata.catalog.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
@ -47,6 +39,7 @@ import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.util.ElasticSearchClientUtils;
import org.openmetadata.catalog.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,18 +73,7 @@ public class ElasticSearchEventHandler implements EventHandler {
public void init(CatalogApplicationConfig config, Jdbi jdbi) {
ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(),
"http"));
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(),
esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
client = new RestHighLevelClient(restClientBuilder);
this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
esIndexDefinition = new ElasticSearchIndexDefinition(client);
esIndexDefinition.createIndexes();
}

View File

@ -10,6 +10,8 @@ import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import lombok.experimental.SuperBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
@ -32,11 +34,8 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -65,11 +64,11 @@ public class ElasticSearchIndexDefinition {
}
public enum ElasticSearchIndexType {
TABLE_SEARCH_INDEX("table_search_index", "elasticsearch/table_index_mapping.json"),
TOPIC_SEARCH_INDEX("topic_search_index", "elasticsearch/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "elasticsearch/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "elasticsearch/pipeline_index_mapping.json"),
DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "elasticsearch/dbt_index_mapping.json");
TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/table_index_mapping.json"),
TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"),
DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "/elasticsearch/dbt_index_mapping.json");
public final String indexName;
public final String indexMappingFile;
@ -90,6 +89,16 @@ public class ElasticSearchIndexDefinition {
}
}
public void dropIndexes() {
try {
for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) {
deleteIndex(elasticSearchIndexType);
}
} catch(Exception e) {
LOG.error("Failed to delete Elastic Search indexes due to", e);
}
}
public boolean checkIndexExistsOrCreate(ElasticSearchIndexType indexType) {
boolean exists = elasticSearchIndexes.get(indexType) == ElasticSearchIndexStatus.CREATED;
if (!exists) {
@ -119,16 +128,26 @@ public class ElasticSearchIndexDefinition {
return true;
}
private boolean deleteIndex(ElasticSearchIndexType elasticSearchIndexType) {
try {
DeleteIndexRequest request = new DeleteIndexRequest(elasticSearchIndexType.indexName);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
LOG.info(elasticSearchIndexType.indexName + " Deleted " + deleteIndexResponse.isAcknowledged());
} catch (IOException e) {
LOG.error("Failed to delete Elastic Search indexes due to", e);
return false;
}
return true;
}
private void setIndexStatus(ElasticSearchIndexType indexType, ElasticSearchIndexStatus elasticSearchIndexStatus) {
elasticSearchIndexes.put(indexType, elasticSearchIndexStatus);
}
public String getIndexMapping(ElasticSearchIndexType elasticSearchIndexType)
throws URISyntaxException, IOException {
URL resource = ElasticSearchIndexDefinition.class
.getClassLoader().getResource(elasticSearchIndexType.indexMappingFile);
Path path = Paths.get(resource.toURI());
return new String(Files.readAllBytes(path));
InputStream in = ElasticSearchIndexDefinition.class.getResourceAsStream(elasticSearchIndexType.indexMappingFile);
return new String(in.readAllBytes());
}
public ElasticSearchIndexType getIndexMappingByEntityType(String type) {
@ -151,6 +170,7 @@ public class ElasticSearchIndexDefinition {
@SuperBuilder
@Data
class ElasticSearchIndex {
String name;
@JsonProperty("display_name")
String displayName;
String fqdn;
@ -227,8 +247,6 @@ class ESChangeDescription {
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class TableESIndex extends ElasticSearchIndex {
@JsonProperty("table_name")
String tableName;
@JsonProperty("table_id")
String tableId;
String database;
@ -280,7 +298,7 @@ class TableESIndex extends ElasticSearchIndex {
}
}
TableESIndexBuilder tableESIndexBuilder = internalBuilder().tableId(tableId)
.tableName(tableName)
.name(tableName)
.displayName(tableName)
.description(description)
.fqdn(table.getFullyQualifiedName())
@ -373,8 +391,6 @@ class TableESIndex extends ElasticSearchIndex {
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class TopicESIndex extends ElasticSearchIndex {
@JsonProperty("topic_name")
String topicName;
@JsonProperty("topic_id")
String topicId;
@ -389,7 +405,7 @@ class TopicESIndex extends ElasticSearchIndex {
}
TopicESIndexBuilder topicESIndexBuilder = internalBuilder().topicId(topic.getId().toString())
.topicName(topic.getName())
.name(topic.getName())
.displayName(topic.getDisplayName())
.description(topic.getDescription())
.fqdn(topic.getFullyQualifiedName())
@ -437,8 +453,6 @@ class TopicESIndex extends ElasticSearchIndex {
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class DashboardESIndex extends ElasticSearchIndex {
@JsonProperty("dashboard_name")
String dashboardName;
@JsonProperty("dashboard_id")
String dashboardId;
@JsonProperty("chart_names")
@ -476,7 +490,7 @@ class DashboardESIndex extends ElasticSearchIndex {
}
DashboardESIndexBuilder dashboardESIndexBuilder = internalBuilder().dashboardId(dashboard.getId().toString())
.dashboardName(dashboard.getDisplayName())
.name(dashboard.getDisplayName())
.displayName(dashboard.getDisplayName())
.description(dashboard.getDescription())
.fqdn(dashboard.getFullyQualifiedName())
@ -532,8 +546,6 @@ class DashboardESIndex extends ElasticSearchIndex {
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class PipelineESIndex extends ElasticSearchIndex {
@JsonProperty("pipeline_name")
String pipelineName;
@JsonProperty("pipeine_id")
String pipelineId;
@JsonProperty("task_names")
@ -559,7 +571,7 @@ class PipelineESIndex extends ElasticSearchIndex {
}
PipelineESIndexBuilder pipelineESIndexBuilder = internalBuilder().pipelineId(pipeline.getId().toString())
.pipelineName(pipeline.getDisplayName())
.name(pipeline.getDisplayName())
.displayName(pipeline.getDisplayName())
.description(pipeline.getDescription())
.fqdn(pipeline.getFullyQualifiedName())
@ -607,8 +619,6 @@ class PipelineESIndex extends ElasticSearchIndex {
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class DbtModelESIndex extends ElasticSearchIndex {
@JsonProperty("dbt_model_name")
String dbtModelName;
@JsonProperty("dbt_model_id")
String dbtModelId;
@JsonProperty("column_names")
@ -637,7 +647,7 @@ class DbtModelESIndex extends ElasticSearchIndex {
}
DbtModelESIndexBuilder dbtModelESIndexBuilder = internalBuilder().dbtModelId(dbtModel.getId().toString())
.dbtModelName(dbtModel.getName())
.name(dbtModel.getName())
.displayName(dbtModel.getName())
.description(dbtModel.getDescription())
.fqdn(dbtModel.getFullyQualifiedName())

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.openmetadata.catalog.util.ElasticSearchClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,16 +70,7 @@ public class SearchResource {
private static final Logger LOG = LoggerFactory.getLogger(SearchResource.class);
public SearchResource(ElasticSearchConfiguration esConfig) {
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http"));
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
this.client = new RestHighLevelClient(restClientBuilder);
this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
}
@GET
@ -204,7 +196,7 @@ public class SearchResource {
private SearchSourceBuilder buildTableSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
new HighlightBuilder.Field("table_name");
new HighlightBuilder.Field("name");
highlightTableName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
@ -223,7 +215,7 @@ public class SearchResource {
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.field("table_name", 5.0f)
.field("name", 5.0f)
.field("description")
.field("column_names")
.field("column_descriptions")
@ -253,7 +245,7 @@ public class SearchResource {
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.field("topic_name", 5.0f)
.field("name", 5.0f)
.field("description")
.lenient(true))
.aggregation(AggregationBuilders.terms("Service").field("service_type"))
@ -290,7 +282,7 @@ public class SearchResource {
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.field("dashboard_name", 5.0f)
.field("name", 5.0f)
.field("description")
.field("chart_names")
.field("chart_descriptions")
@ -309,7 +301,7 @@ public class SearchResource {
private SearchSourceBuilder buildPipelineSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightPipelineName =
new HighlightBuilder.Field("pipeline_name");
new HighlightBuilder.Field("name");
highlightPipelineName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
@ -347,7 +339,7 @@ public class SearchResource {
private SearchSourceBuilder buildDbtModelSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
new HighlightBuilder.Field("dbt_model_name");
new HighlightBuilder.Field("name");
highlightTableName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
@ -366,7 +358,7 @@ public class SearchResource {
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.field("dbt_model_name", 5.0f)
.field("name", 5.0f)
.field("description")
.field("column_names")
.field("column_descriptions")

View File

@ -0,0 +1,87 @@
package org.openmetadata.catalog.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
public final class ElasticSearchClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClientUtils.class);
private ElasticSearchClientUtils() {
}
public static RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration esConfig) {
try {
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(),
esConfig.getScheme()));
if (StringUtils.isNotEmpty(esConfig.getUsername()) && StringUtils.isNotEmpty(esConfig.getUsername())) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(),
esConfig.getPassword()));
SSLContext sslContext = createSSLContext(esConfig);
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//httpAsyncClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true);
if (sslContext != null) {
httpAsyncClientBuilder.setSSLContext(sslContext);
}
return httpAsyncClientBuilder;
});
}
restClientBuilder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
throw new RuntimeException("Failed to create elastic search client ", e);
}
}
private static SSLContext createSSLContext(ElasticSearchConfiguration elasticSearchConfiguration)
throws KeyStoreException {
if (elasticSearchConfiguration.getScheme().equals("https")) {
if (elasticSearchConfiguration.getTruststorePath() != null) {
Path trustStorePath = Paths.get(elasticSearchConfiguration.getTruststorePath());
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(trustStorePath)) {
truststore.load(is, elasticSearchConfiguration.getTruststorePassword().toCharArray());
SSLContextBuilder sslBuilder = SSLContexts.custom()
.loadTrustMaterial(truststore, null);
return sslBuilder.build();
} catch (IOException | NoSuchAlgorithmException | CertificateException |
KeyStoreException | KeyManagementException e) {
throw new RuntimeException("Failed to crete SSLContext to for ElasticSearch Client", e);
}
}
}
return null;
}
}

View File

@ -20,9 +20,11 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.elasticsearch.client.RestHighLevelClient;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationVersion;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
@ -65,6 +67,11 @@ public final class TablesInitializer {
OPTIONS.addOption(null, DISABLE_VALIDATE_ON_MIGRATE, false,
"Disable flyway validation checks while running " +
"migrate");
OPTIONS.addOption(null, SchemaMigrationOption.ES_CREATE.toString(), false,
"Creates all the indexes in the elastic search");
OPTIONS.addOption(null, SchemaMigrationOption.ES_DROP.toString(), false,
"Drop all the indexes in the elastic search");
}
private TablesInitializer() {
@ -103,6 +110,7 @@ public final class TablesInitializer {
ObjectMapper objectMapper = new YAMLMapper();
Map<String, Object> conf = objectMapper.readValue(new File(confFilePath), Map.class);
Map<String, Object> dbConf = (Map<String, Object>) conf.get("database");
Map<String, Object> esConf = (Map<String, Object>) conf.get("elasticsearch");
if (dbConf == null) {
throw new RuntimeException("No database in config file");
}
@ -115,14 +123,18 @@ public final class TablesInitializer {
}
String scriptRootPath = commandLine.getOptionValue(OPTION_SCRIPT_ROOT_PATH);
Flyway flyway = get(jdbcUrl, user, password, scriptRootPath, !disableValidateOnMigrate);
ObjectMapper oMapper = new ObjectMapper();
ElasticSearchConfiguration esConfig = oMapper.convertValue(esConf, ElasticSearchConfiguration.class);
RestHighLevelClient client = ElasticSearchClientUtils.createElasticSearchClient(
esConfig);
try {
execute(flyway, schemaMigrationOptionSpecified);
execute(flyway, client, schemaMigrationOptionSpecified);
System.out.printf("\"%s\" option successful%n", schemaMigrationOptionSpecified.toString());
} catch (Exception e) {
System.err.printf("\"%s\" option failed : %s%n", schemaMigrationOptionSpecified.toString(), e);
System.exit(1);
}
System.exit(0);
}
static Flyway get(String url, String user, String password, String scriptRootPath, boolean validateOnMigrate) {
@ -143,49 +155,60 @@ public final class TablesInitializer {
.load();
}
private static void execute(Flyway flyway, SchemaMigrationOption schemaMigrationOption) throws SQLException {
private static void execute(Flyway flyway, RestHighLevelClient client,
SchemaMigrationOption schemaMigrationOption) throws SQLException {
ElasticSearchIndexDefinition esIndexDefinition;
switch (schemaMigrationOption) {
case CREATE:
try (Connection connection = flyway.getConfiguration().getDataSource().getConnection()) {
DatabaseMetaData databaseMetaData = connection.getMetaData();
try (ResultSet resultSet = databaseMetaData.getTables(connection.getCatalog(), connection.getSchema(),
case CREATE:
try (Connection connection = flyway.getConfiguration().getDataSource().getConnection()) {
DatabaseMetaData databaseMetaData = connection.getMetaData();
try (ResultSet resultSet = databaseMetaData.getTables(connection.getCatalog(), connection.getSchema(),
"",
null)) {
// If the database has any entity like views, tables etc, resultSet.next() would return true here
if (resultSet.next()) {
throw new SQLException("Please use an empty database or use \"migrate\" if you are already running a " +
// If the database has any entity like views, tables etc, resultSet.next() would return true here
if (resultSet.next()) {
throw new SQLException("Please use an empty database or use \"migrate\" if you are already running a " +
"previous version.");
}
} catch (SQLException e) {
throw new SQLException("Unable the obtain the state of the target database", e);
}
} catch (SQLException e) {
throw new SQLException("Unable the obtain the state of the target database", e);
}
}
flyway.migrate();
break;
case MIGRATE:
flyway.migrate();
break;
case INFO:
System.out.println(dumpToAsciiTable(flyway.info().all()));
break;
case VALIDATE:
flyway.validate();
break;
case DROP:
flyway.clean();
break;
case CHECK_CONNECTION:
try {
flyway.getConfiguration().getDataSource().getConnection();
} catch (Exception e) {
throw new SQLException(e);
}
break;
case REPAIR:
flyway.repair();
break;
default:
throw new SQLException("SchemaMigrationHelper unable to execute the option : " +
flyway.migrate();
break;
case MIGRATE:
flyway.migrate();
break;
case INFO:
System.out.println(dumpToAsciiTable(flyway.info().all()));
break;
case VALIDATE:
flyway.validate();
break;
case DROP:
flyway.clean();
System.out.println("DONE");
break;
case CHECK_CONNECTION:
try {
flyway.getConfiguration().getDataSource().getConnection();
} catch (Exception e) {
throw new SQLException(e);
}
break;
case REPAIR:
flyway.repair();
break;
case ES_CREATE:
esIndexDefinition = new ElasticSearchIndexDefinition(client);
esIndexDefinition.createIndexes();
break;
case ES_DROP:
esIndexDefinition = new ElasticSearchIndexDefinition(client);
esIndexDefinition.dropIndexes();
break;
default:
throw new SQLException("SchemaMigrationHelper unable to execute the option : " +
schemaMigrationOption.toString());
}
}
@ -202,7 +225,9 @@ public final class TablesInitializer {
VALIDATE("validate"),
INFO("info"),
DROP("drop"),
REPAIR("repair");
REPAIR("repair"),
ES_DROP("es-drop"),
ES_CREATE("es-create");
private final String value;

View File

@ -1,6 +1,6 @@
{
"properties": {
"dashboard_name": {
"name": {
"type":"text"
},
"display_name": {

View File

@ -1,6 +1,6 @@
{
"properties": {
"dbt_model_name": {
"name": {
"type":"text"
},
"display_name": {

View File

@ -1,6 +1,6 @@
{
"properties": {
"pipeline_name": {
"name": {
"type":"text"
},
"display_name": {

View File

@ -1,6 +1,6 @@
{
"properties": {
"table_name": {
"name": {
"type":"text"
},
"display_name": {

View File

@ -1,6 +1,6 @@
{
"properties": {
"topic_name": {
"name": {
"type":"text"
},
"display_name": {

View File

@ -133,6 +133,7 @@ authenticationConfiguration:
elasticsearch:
host: localhost
port: 9200
scheme: "http"
airflowConfiguration:
apiEndpoint: "http://localhost:8080"

View File

@ -114,6 +114,7 @@ database:
elasticsearch:
host: localhost
port: 9200
scheme: "http"
eventHandlerConfiguration:
eventHandlerClassNames:

View File

@ -23,6 +23,20 @@ class Table(BaseModel):
fullyQualifiedName: str
class FieldChange(BaseModel):
name: str
newValue: Optional[str]
oldValue: Optional[str]
class ChangeDescription(BaseModel):
updatedBy: str
updatedAt: int
fieldsAdded: Optional[str]
fieldsDeleted: Optional[str]
fieldsUpdated: Optional[str]
class TableESDocument(BaseModel):
"""Elastic Search Mapping doc"""
@ -50,6 +64,8 @@ class TableESDocument(BaseModel):
tier: Optional[str] = None
owner: str
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
class TopicESDocument(BaseModel):
@ -69,6 +85,8 @@ class TopicESDocument(BaseModel):
tier: Optional[str] = None
owner: str
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
class DashboardESDocument(BaseModel):
@ -96,6 +114,8 @@ class DashboardESDocument(BaseModel):
weekly_percentile_rank: int
daily_stats: int
daily_percentile_rank: int
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
class PipelineESDocument(BaseModel):
@ -117,6 +137,8 @@ class PipelineESDocument(BaseModel):
tier: Optional[str] = None
owner: str
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
class DbtModelESDocument(BaseModel):
@ -140,6 +162,8 @@ class DbtModelESDocument(BaseModel):
tier: Optional[str] = None
owner: str
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
class DashboardOwner(BaseModel):

View File

@ -34,6 +34,7 @@ from metadata.generated.schema.entity.services.storageService import StorageServ
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityHistory import EntityVersionHistory
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.ingestion.ometa.mixins.lineageMixin import OMetaLineageMixin
@ -398,6 +399,20 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
after = resp["paging"]["after"] if "after" in resp["paging"] else None
return EntityList(entities=entities, total=total, after=after)
def list_versions(self, entity_id: str, entity: Type[T]) -> EntityVersionHistory:
"""
Helps us paginate over the collection
"""
suffix = self.get_suffix(entity)
path = f"/{entity_id}/versions"
resp = self.client.get(f"{suffix}{path}")
if self._use_raw_data:
return resp
else:
return EntityVersionHistory(**resp)
def list_services(self, entity: Type[T]) -> List[EntityList[T]]:
"""
Service listing does not implement paging

View File

@ -11,10 +11,13 @@
import json
import logging
import ssl
import time
from typing import List, Optional
from dateutil import parser
from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.chart import Chart
@ -32,6 +35,7 @@ from metadata.generated.schema.type import entityReference
from metadata.ingestion.api.common import Record, WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.table_metadata import (
ChangeDescription,
DashboardESDocument,
DbtModelESDocument,
PipelineESDocument,
@ -66,6 +70,11 @@ class ElasticSearchConfig(ConfigModel):
dashboard_index_name: str = "dashboard_search_index"
pipeline_index_name: str = "pipeline_search_index"
dbt_index_name: str = "dbt_model_search_index"
scheme: str = "http"
use_ssl: bool = False
verify_certs: bool = False
timeout: int = 30
ca_certs: Optional[str] = None
class ElasticsearchSink(Sink):
@ -97,12 +106,25 @@ class ElasticsearchSink(Sink):
http_auth = None
if self.config.es_username:
http_auth = (self.config.es_username, self.config.es_password)
ssl_context = None
if self.config.scheme == "https" and not self.config.verify_certs:
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.elasticsearch_client = Elasticsearch(
[
{"host": self.config.es_host, "port": self.config.es_port},
],
http_auth=http_auth,
scheme=self.config.scheme,
use_ssl=self.config.use_ssl,
verify_certs=self.config.verify_certs,
ssl_context=ssl_context,
ca_certs=self.config.ca_certs,
)
if self.config.index_tables:
self._check_or_create_index(
self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING
@ -140,7 +162,9 @@ class ElasticsearchSink(Sink):
"properties": es_mapping_dict["mappings"]["properties"]
}
self.elasticsearch_client.indices.put_mapping(
index=index_name, body=json.dumps(es_mapping_update_dict)
index=index_name,
body=json.dumps(es_mapping_update_dict),
request_timeout=self.config.timeout,
)
else:
logger.warning(
@ -148,7 +172,9 @@ class ElasticsearchSink(Sink):
+ "The index doesn't exist for a newly created ES. It's OK on first run."
)
# create new index with mapping
self.elasticsearch_client.indices.create(index=index_name, body=es_mapping)
self.elasticsearch_client.indices.create(
index=index_name, body=es_mapping, request_timeout=self.config.timeout
)
def write_record(self, record: Record) -> None:
if isinstance(record, Table):
@ -157,6 +183,7 @@ class ElasticsearchSink(Sink):
index=self.config.table_index_name,
id=str(table_doc.table_id),
body=table_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Topic):
topic_doc = self._create_topic_es_doc(record)
@ -164,6 +191,7 @@ class ElasticsearchSink(Sink):
index=self.config.topic_index_name,
id=str(topic_doc.topic_id),
body=topic_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Dashboard):
dashboard_doc = self._create_dashboard_es_doc(record)
@ -171,6 +199,7 @@ class ElasticsearchSink(Sink):
index=self.config.dashboard_index_name,
id=str(dashboard_doc.dashboard_id),
body=dashboard_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Pipeline):
pipeline_doc = self._create_pipeline_es_doc(record)
@ -178,6 +207,7 @@ class ElasticsearchSink(Sink):
index=self.config.pipeline_index_name,
id=str(pipeline_doc.pipeline_id),
body=pipeline_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, DbtModel):
dbt_model_doc = self._create_dbt_model_es_doc(record)
@ -185,6 +215,7 @@ class ElasticsearchSink(Sink):
index=self.config.dbt_index_name,
id=str(dbt_model_doc.dbt_model_id),
body=dbt_model_doc.json(),
request_timeout=self.config.timeout,
)
if hasattr(record.name, "__root__"):
@ -229,6 +260,8 @@ class ElasticsearchSink(Sink):
table_type = None
if hasattr(table.tableType, "name"):
table_type = table.tableType.name
change_descriptions = self._get_change_descriptions(Table, table.id.__root__)
table_doc = TableESDocument(
table_id=str(table.id.__root__),
database=str(database_entity.name.__root__),
@ -253,6 +286,7 @@ class ElasticsearchSink(Sink):
fqdn=fqdn,
owner=table_owner,
followers=table_followers,
change_descriptions=change_descriptions,
)
return table_doc
@ -279,6 +313,7 @@ class ElasticsearchSink(Sink):
tier = topic_tag.tagFQN
else:
tags.add(topic_tag.tagFQN)
change_descriptions = self._get_change_descriptions(Topic, topic.id.__root__)
topic_doc = TopicESDocument(
topic_id=str(topic.id.__root__),
service=service_entity.name,
@ -293,8 +328,9 @@ class ElasticsearchSink(Sink):
fqdn=fqdn,
owner=topic_owner,
followers=topic_followers,
change_descriptions=change_descriptions,
)
print(topic_doc.json())
return topic_doc
def _create_dashboard_es_doc(self, dashboard: Dashboard):
@ -329,6 +365,9 @@ class ElasticsearchSink(Sink):
if len(chart.tags) > 0:
for col_tag in chart.tags:
tags.add(col_tag.tagFQN)
change_descriptions = self._get_change_descriptions(
Dashboard, dashboard.id.__root__
)
dashboard_doc = DashboardESDocument(
dashboard_id=str(dashboard.id.__root__),
service=service_entity.name,
@ -351,6 +390,7 @@ class ElasticsearchSink(Sink):
weekly_percentile_rank=dashboard.usageSummary.weeklyStats.percentileRank,
daily_stats=dashboard.usageSummary.dailyStats.count,
daily_percentile_rank=dashboard.usageSummary.dailyStats.percentileRank,
change_descriptions=change_descriptions,
)
return dashboard_doc
@ -386,7 +426,9 @@ class ElasticsearchSink(Sink):
if tags in task and len(task.tags) > 0:
for col_tag in task.tags:
tags.add(col_tag.tagFQN)
change_descriptions = self._get_change_descriptions(
Pipeline, pipeline.id.__root__
)
pipeline_doc = PipelineESDocument(
pipeline_id=str(pipeline.id.__root__),
service=service_entity.name,
@ -403,6 +445,7 @@ class ElasticsearchSink(Sink):
fqdn=fqdn,
owner=pipeline_owner,
followers=pipeline_followers,
change_descriptions=change_descriptions,
)
return pipeline_doc
@ -446,6 +489,9 @@ class ElasticsearchSink(Sink):
dbt_node_type = None
if hasattr(dbt_model.dbtNodeType, "name"):
dbt_node_type = dbt_model.dbtNodeType.name
change_descriptions = self._get_change_descriptions(
Dashboard, dbt_model.id.__root__
)
dbt_model_doc = DbtModelESDocument(
dbt_model_id=str(dbt_model.id.__root__),
database=str(database_entity.name.__root__),
@ -465,6 +511,7 @@ class ElasticsearchSink(Sink):
schema_description=None,
owner=dbt_model_owner,
followers=dbt_model_followers,
change_descriptions=change_descriptions,
)
return dbt_model_doc
@ -507,6 +554,28 @@ class ElasticsearchSink(Sink):
tags,
)
def _get_change_descriptions(self, entity_type, entity_id):
entity_versions = self.metadata.list_versions(entity_id, entity_type)
change_descriptions = []
for version in entity_versions.versions:
version_json = json.loads(version)
updatedAt = parser.parse(version_json["updatedAt"])
change_description = ChangeDescription(
updatedBy=version_json["updatedBy"], updatedAt=updatedAt.timestamp()
)
if "changeDescription" in version_json:
change_description.fieldsAdded = version_json["changeDescription"][
"fieldsAdded"
]
change_description.fieldsDeleted = version_json["changeDescription"][
"fieldsDeleted"
]
change_description.fieldsUpdated = version_json["changeDescription"][
"fieldsUpdated"
]
change_descriptions.append(change_description)
return change_descriptions
def get_status(self):
return self.status

View File

@ -16,7 +16,7 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
{
"mappings":{
"properties": {
"table_name": {
"name": {
"type":"text"
},
"display_name": {
@ -85,6 +85,9 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
},
"daily_stats": {
"type": "long"
},
"change_descriptions": {
"type": "nested"
}
}
}
@ -97,7 +100,7 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
{
"mappings":{
"properties": {
"topic_name": {
"name": {
"type":"text"
},
"display_name": {
@ -139,6 +142,9 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}
@ -151,7 +157,7 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
{
"mappings":{
"properties": {
"dashboard_name": {
"name": {
"type":"text"
},
"display_name": {
@ -217,6 +223,9 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
},
"daily_stats": {
"type": "long"
},
"change_descriptions": {
"type": "nested"
}
}
}
@ -229,7 +238,7 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
{
"mappings":{
"properties": {
"pipeline_name": {
"name": {
"type":"text"
},
"display_name": {
@ -277,6 +286,9 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}
@ -290,7 +302,7 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
{
"mappings":{
"properties": {
"dbt_model_name": {
"name": {
"type":"text"
},
"display_name": {
@ -341,6 +353,9 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}