mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-10 16:32:26 +00:00
fix(analytics): Fix SSL issue with analytics on frontend (#2840)
This commit is contained in:
parent
74e34dddfc
commit
acb638cb65
@ -42,8 +42,8 @@ public class AnalyticsService {
|
||||
private final Logger _logger = LoggerFactory.getLogger(AnalyticsService.class.getName());
|
||||
|
||||
private final RestHighLevelClient _elasticClient;
|
||||
private final Optional<String> _indexPrefix;
|
||||
|
||||
private static final String INDEX_NAME = "datahub_usage_event";
|
||||
private static final String FILTERED = "filtered";
|
||||
private static final String DATE_HISTOGRAM = "date_histogram";
|
||||
private static final String UNIQUE = "unique";
|
||||
@ -58,17 +58,24 @@ public class AnalyticsService {
|
||||
public static final String DATA_JOB_INDEX = "datajobindex_v2";
|
||||
public static final String DATASET_INDEX = "datasetindex_v2";
|
||||
|
||||
public AnalyticsService(final RestHighLevelClient elasticClient) {
|
||||
public AnalyticsService(final RestHighLevelClient elasticClient, final Optional<String> indexPrefix) {
|
||||
_elasticClient = elasticClient;
|
||||
_indexPrefix = indexPrefix;
|
||||
}
|
||||
|
||||
private String getIndexName(String baseIndexName) {
|
||||
return _indexPrefix.map(p -> p + "_").orElse("") + baseIndexName;
|
||||
}
|
||||
|
||||
public List<NamedLine> getTimeseriesChart(String indexName, DateRange dateRange, DateInterval granularity,
|
||||
Optional<String> dimension, // Length 1 for now
|
||||
Map<String, List<String>> filters, Optional<String> uniqueOn) {
|
||||
|
||||
String finalIndexName = getIndexName(indexName);
|
||||
_logger.debug(
|
||||
String.format("Invoked getTimeseriesChart with indexName: %s, dateRange: %s, granularity: %s, dimension: %s,",
|
||||
indexName, dateRange, granularity, dimension)
|
||||
+ String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
finalIndexName, dateRange, granularity, dimension) + String.format("filters: %s, uniqueOn: %s", filters,
|
||||
uniqueOn));
|
||||
|
||||
AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), Optional.of(dateRange));
|
||||
|
||||
@ -84,7 +91,7 @@ public class AnalyticsService {
|
||||
filteredAgg.subAggregation(dateHistogram);
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg);
|
||||
SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg);
|
||||
Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations();
|
||||
try {
|
||||
if (dimension.isPresent()) {
|
||||
@ -117,10 +124,10 @@ public class AnalyticsService {
|
||||
public List<NamedBar> getBarChart(String indexName, Optional<DateRange> dateRange, List<String> dimensions,
|
||||
// Length 1 or 2
|
||||
Map<String, List<String>> filters, Optional<String> uniqueOn) {
|
||||
String finalIndexName = getIndexName(indexName);
|
||||
_logger.debug(
|
||||
String.format("Invoked getBarChart with indexName: %s, dateRange: %s, dimensions: %s,",
|
||||
indexName, dateRange, dimensions)
|
||||
+ String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
String.format("Invoked getBarChart with indexName: %s, dateRange: %s, dimensions: %s,", finalIndexName,
|
||||
dateRange, dimensions) + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
|
||||
assert (dimensions.size() == 1 || dimensions.size() == 2);
|
||||
AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), dateRange);
|
||||
@ -136,7 +143,7 @@ public class AnalyticsService {
|
||||
}
|
||||
filteredAgg.subAggregation(termAgg);
|
||||
|
||||
SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg);
|
||||
SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg);
|
||||
Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations();
|
||||
|
||||
try {
|
||||
@ -169,10 +176,10 @@ public class AnalyticsService {
|
||||
|
||||
public List<Row> getTopNTableChart(String indexName, Optional<DateRange> dateRange, String groupBy,
|
||||
Map<String, List<String>> filters, Optional<String> uniqueOn, int maxRows) {
|
||||
String finalIndexName = getIndexName(indexName);
|
||||
_logger.debug(
|
||||
String.format("Invoked getTopNTableChart with indexName: %s, dateRange: %s, groupBy: %s",
|
||||
indexName, dateRange, groupBy)
|
||||
+ String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
String.format("Invoked getTopNTableChart with indexName: %s, dateRange: %s, groupBy: %s", finalIndexName,
|
||||
dateRange, groupBy) + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
|
||||
AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), dateRange);
|
||||
|
||||
@ -183,7 +190,7 @@ public class AnalyticsService {
|
||||
}
|
||||
filteredAgg.subAggregation(termAgg);
|
||||
|
||||
SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg);
|
||||
SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg);
|
||||
Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations();
|
||||
|
||||
try {
|
||||
@ -200,10 +207,14 @@ public class AnalyticsService {
|
||||
|
||||
public int getHighlights(String indexName, Optional<DateRange> dateRange, Map<String, List<String>> filters,
|
||||
Map<String, List<String>> mustNotFilters, Optional<String> uniqueOn) {
|
||||
String finalIndexName = getIndexName(indexName);
|
||||
_logger.debug(String.format("Invoked getHighlights with indexName: %s, dateRange: %s", finalIndexName, dateRange)
|
||||
+ String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
|
||||
|
||||
AggregationBuilder filteredAgg = getFilteredAggregation(filters, mustNotFilters, dateRange);
|
||||
uniqueOn.ifPresent(s -> filteredAgg.subAggregation(getUniqueQuery(s)));
|
||||
|
||||
SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg);
|
||||
SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg);
|
||||
Filter aggregationResult = executeAndExtract(searchRequest);
|
||||
try {
|
||||
if (uniqueOn.isPresent()) {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package react.analytics;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import java.util.Optional;
|
||||
import org.apache.http.ssl.SSLContextBuilder;
|
||||
import play.Environment;
|
||||
import javax.annotation.Nonnull;
|
||||
@ -14,6 +15,8 @@ import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.CertificateException;
|
||||
import utils.ConfigUtil;
|
||||
|
||||
|
||||
/**
|
||||
* Guice module responsible for configuring & creating an instance of {@link AnalyticsService}.
|
||||
@ -33,6 +36,7 @@ public class AnalyticsServiceModule extends AbstractModule {
|
||||
private static final String ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH = "analytics.elastic.connectionRequestTimeout";
|
||||
private static final String ELASTIC_CLIENT_USERNAME_PATH = "analytics.elastic.username";
|
||||
private static final String ELASTIC_CLIENT_PASSWORD_PATH = "analytics.elastic.password";
|
||||
private static final String ELASTIC_INDEX_PREFIX = "analytics.elastic.indexPrefix";
|
||||
|
||||
/*
|
||||
Required SSL Config Paths
|
||||
@ -61,29 +65,32 @@ public class AnalyticsServiceModule extends AbstractModule {
|
||||
@Override
|
||||
protected void configure() {
|
||||
final SSLContext context = createSSLContext();
|
||||
final AnalyticsService backend = new AnalyticsService(ElasticClientFactory.createElasticClient(
|
||||
_configs.hasPath(ELASTIC_CLIENT_USE_SSL_PATH) && _configs.getBoolean(ELASTIC_CLIENT_USE_SSL_PATH),
|
||||
final AnalyticsService backend = new AnalyticsService(
|
||||
ElasticClientFactory.createElasticClient(
|
||||
ConfigUtil.getBoolean(_configs, ELASTIC_CLIENT_USE_SSL_PATH),
|
||||
_configs.getString(ELASTIC_CLIENT_HOST_PATH),
|
||||
_configs.getInt(ELASTIC_CLIENT_PORT_PATH),
|
||||
_configs.hasPath(ELASTIC_CLIENT_THREAD_COUNT_PATH) ? _configs.getInt(ELASTIC_CLIENT_THREAD_COUNT_PATH) : DEFAULT_THREAD_COUNT,
|
||||
_configs.hasPath(ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH) ? _configs.getInt(ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH) : DEFAULT_CONNECTION_TIMEOUT,
|
||||
_configs.hasPath(ELASTIC_CLIENT_USERNAME_PATH) ? _configs.getString(ELASTIC_CLIENT_USERNAME_PATH) : null,
|
||||
_configs.hasPath(ELASTIC_CLIENT_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_PASSWORD_PATH) : null,
|
||||
ConfigUtil.getInt(_configs, ELASTIC_CLIENT_THREAD_COUNT_PATH, DEFAULT_THREAD_COUNT),
|
||||
ConfigUtil.getInt(_configs, ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH, DEFAULT_CONNECTION_TIMEOUT),
|
||||
ConfigUtil.getString(_configs, ELASTIC_CLIENT_USERNAME_PATH, null),
|
||||
ConfigUtil.getString(_configs, ELASTIC_CLIENT_PASSWORD_PATH, null),
|
||||
context
|
||||
));
|
||||
),
|
||||
Optional.ofNullable(ConfigUtil.getString(_configs, ELASTIC_INDEX_PREFIX, null))
|
||||
);
|
||||
bind(AnalyticsService.class).toInstance(backend);
|
||||
}
|
||||
|
||||
private SSLContext createSSLContext() {
|
||||
|
||||
final String sslProtocol = _configs.hasPath(ELASTIC_CLIENT_SSL_PROTOCOL_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_PROTOCOL_PATH) : null;
|
||||
final String sslTrustStoreFile = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH) : null;
|
||||
final String sslTrustStoreType = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH) : null;
|
||||
final String sslTrustStorePassword = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH): null;
|
||||
final String sslKeyStoreFile = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH) : null;
|
||||
final String sslKeyStoreType = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH) : null;
|
||||
final String sslKeyStorePassword = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH) : null;
|
||||
final String sslSecureRandomImplementation = _configs.hasPath(ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH) : null;
|
||||
final String sslProtocol = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_PROTOCOL_PATH, null);
|
||||
final String sslTrustStoreFile = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH, null);
|
||||
final String sslTrustStoreType = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH, null);
|
||||
final String sslTrustStorePassword = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH, null);
|
||||
final String sslKeyStoreFile = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH, null);
|
||||
final String sslKeyStoreType = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH, null);
|
||||
final String sslKeyStorePassword = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH, null);
|
||||
final String sslSecureRandomImplementation = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH, null);
|
||||
|
||||
final SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
|
||||
if (sslProtocol != null) {
|
||||
|
||||
@ -23,7 +23,10 @@ import react.graphql.PlayQueryContext;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import utils.ConfigUtil;
|
||||
|
||||
|
||||
public class TrackingController extends Controller {
|
||||
|
||||
@ -86,6 +89,11 @@ public class TrackingController extends Controller {
|
||||
_producer.close();
|
||||
}
|
||||
|
||||
private void setConfig(Properties props, String key, String configKey) {
|
||||
Optional.ofNullable(ConfigUtil.getString(_config, configKey, null))
|
||||
.ifPresent(v -> props.put(key, v));
|
||||
}
|
||||
|
||||
private KafkaProducer createKafkaProducer() {
|
||||
final Properties props = new Properties();
|
||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
|
||||
@ -97,22 +105,22 @@ public class TrackingController extends Controller {
|
||||
if (_config.hasPath(securityProtocolConfig)
|
||||
&& KAFKA_SSL_PROTOCOLS.contains(_config.getString(securityProtocolConfig))) {
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, _config.getString(securityProtocolConfig));
|
||||
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.key.password"));
|
||||
setConfig(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "analytics.kafka.ssl.key.password");
|
||||
|
||||
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.keystore.type"));
|
||||
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.keystore.location"));
|
||||
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.keystore.password"));
|
||||
setConfig(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "analytics.kafka.ssl.keystore.type");
|
||||
setConfig(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.keystore.location");
|
||||
setConfig(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.keystore.password");
|
||||
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.truststore.type"));
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.truststore.location"));
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.truststore.password"));
|
||||
setConfig(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "analytics.kafka.ssl.truststore.type");
|
||||
setConfig(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.truststore.location");
|
||||
setConfig(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.truststore.password");
|
||||
|
||||
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, _config.getString("analytics.kafka.ssl.protocol"));
|
||||
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, _config.getString("analytics.kafka.ssl.endpoint.identification.algorithm"));
|
||||
setConfig(props, SslConfigs.SSL_PROTOCOL_CONFIG, "analytics.kafka.ssl.protocol");
|
||||
setConfig(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "analytics.kafka.ssl.endpoint.identification.algorithm");
|
||||
|
||||
if (_config.getString(securityProtocolConfig).equals(SecurityProtocol.SASL_SSL.name())) {
|
||||
props.put(SaslConfigs.SASL_MECHANISM, _config.getString("analytics.kafka.sasl.mechanism"));
|
||||
props.put(SaslConfigs.SASL_JAAS_CONFIG, _config.getString("analytics.kafka.sasl.jaas.config"));
|
||||
setConfig(props, SaslConfigs.SASL_MECHANISM, "analytics.kafka.sasl.mechanism");
|
||||
setConfig(props, SaslConfigs.SASL_JAAS_CONFIG, "analytics.kafka.sasl.jaas.config");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
18
datahub-frontend/app/utils/ConfigUtil.java
Normal file
18
datahub-frontend/app/utils/ConfigUtil.java
Normal file
@ -0,0 +1,18 @@
|
||||
package utils;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
|
||||
|
||||
public class ConfigUtil {
|
||||
public static boolean getBoolean(Config config, String key) {
|
||||
return config.hasPath(key) && config.getBoolean(key);
|
||||
}
|
||||
|
||||
public static int getInt(Config config, String key, int defaultValue) {
|
||||
return config.hasPath(key) ? config.getInt(key) : defaultValue;
|
||||
}
|
||||
|
||||
public static String getString(Config config, String key, String defaultValue) {
|
||||
return config.hasPath(key) ? config.getString(key) : defaultValue;
|
||||
}
|
||||
}
|
||||
@ -185,3 +185,5 @@ analytics.elastic.sslContext.sslKeyStorePassword = ${?ELASTIC_CLIENT_SSL_KEY_STO
|
||||
# Optional username + password for use with SSL
|
||||
analytics.elastic.username = ${?ELASTIC_CLIENT_USERNAME}
|
||||
analytics.elastic.password = ${?ELASTIC_CLIENT_PASSWORD}
|
||||
|
||||
analytics.elastic.indexPrefix = ${?ELASTIC_INDEX_PREFIX}
|
||||
@ -26,13 +26,19 @@ spec:
|
||||
securityContext:
|
||||
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
||||
volumes:
|
||||
{{- with .Values.extraVolumes }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- with .Values.global.credentialsAndCertsSecrets }}
|
||||
- name: datahub-certs-dir
|
||||
secret:
|
||||
defaultMode: 0444
|
||||
secretName: {{ .name }}
|
||||
{{- end }}
|
||||
{{- if .Values.exporters.jmx.enabled }}
|
||||
- name: config-jmx-exporter
|
||||
configMap:
|
||||
name: {{ include "datahub-frontend.fullname" . }}-config-jmx-exporter
|
||||
name: {{ include "datahub-gms.fullname" . }}-config-jmx-exporter
|
||||
{{- end }}
|
||||
{{- with .Values.extraVolumes }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
initContainers:
|
||||
{{- with .Values.extraInitContainers }}
|
||||
@ -82,8 +88,26 @@ spec:
|
||||
value: "{{ .Values.global.datahub.appVersion }}"
|
||||
- name: DATAHUB_PLAY_MEM_BUFFER_SIZE
|
||||
value: "{{ .Values.datahub.play.mem.buffer.size }}"
|
||||
- name: DATAHUB_ANALYTICS_ENABLED
|
||||
value: "{{ .Values.global.datahub_analytics_enabled }}"
|
||||
{{- if .Values.global.datahub_analytics_enabled }}
|
||||
- name: KAFKA_BOOTSTRAP_SERVER
|
||||
value: "{{ .Values.global.kafka.bootstrap.server }}"
|
||||
{{- if .Values.global.springKafkaConfigurationOverrides }}
|
||||
{{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }}
|
||||
- name: KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }}
|
||||
value: {{ $configValue }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- if .Values.global.credentialsAndCertsSecrets }}
|
||||
{{- range $envVarName, $envVarValue := .Values.global.credentialsAndCertsSecrets.secureEnv }}
|
||||
- name: KAFKA_PROPERTIES_{{ $envVarName | replace "." "_" | upper }}
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ $.Values.global.credentialsAndCertsSecrets.name }}
|
||||
key: {{ $envVarValue }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
- name: ELASTIC_CLIENT_HOST
|
||||
value: "{{ .Values.global.elasticsearch.host }}"
|
||||
- name: ELASTIC_CLIENT_PORT
|
||||
@ -101,14 +125,22 @@ spec:
|
||||
name: "{{ .password.secretRef }}"
|
||||
key: "{{ .password.secretKey }}"
|
||||
{{- end }}
|
||||
{{- if .Values.global.kafka.topics }}
|
||||
- name: DATAHUB_TRACKING_TOPIC
|
||||
value: {{ .Values.global.kafka.topics.datahub_usage_event_name}}
|
||||
{{- else }}
|
||||
- name: DATAHUB_TRACKING_TOPIC
|
||||
value: "DataHubUsageEvent_v1"
|
||||
- name: DATAHUB_ANALYTICS_ENABLED
|
||||
value: "{{ .Values.global.datahub_analytics_enabled }}"
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- with .Values.extraEnvs }}
|
||||
{{- toYaml . | nindent 12 }}
|
||||
{{- end }}
|
||||
volumeMounts:
|
||||
{{- with .Values.global.credentialsAndCertsSecrets }}
|
||||
- name: datahub-certs-dir
|
||||
mountPath: {{ .path | default "/mnt/certs" }}
|
||||
{{- end }}
|
||||
{{- with .Values.extraVolumeMounts }}
|
||||
{{- toYaml . | nindent 12 }}
|
||||
{{- end }}
|
||||
|
||||
@ -17,160 +17,57 @@ else
|
||||
ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD@$ELASTICSEARCH_HOST
|
||||
fi
|
||||
|
||||
function get_index_name() {
|
||||
if [[ -z "$INDEX_PREFIX" ]]; then
|
||||
echo $1
|
||||
else
|
||||
echo "${INDEX_PREFIX}_$1"
|
||||
fi
|
||||
}
|
||||
|
||||
function generate_index_file() {
|
||||
jq -n \
|
||||
--slurpfile settings "$1" \
|
||||
--slurpfile mappings "$2" \
|
||||
'.settings=$settings[0] | .mappings=$mappings[0]' > "$3"
|
||||
}
|
||||
|
||||
function check_reindex() {
|
||||
initial_documents=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_count" -H 'Content-Type: application/json' | jq '.count')
|
||||
for i in $(seq 30); do
|
||||
echo $i
|
||||
reindexed_documents=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$2/_count" -H 'Content-Type: application/json' | jq '.count')
|
||||
if [[ $reindexed_documents == "$initial_documents" ]]; then
|
||||
echo -e "\nPost-reindex document reconcialiation completed. doc_source_index_count: $initial_documents; doc_target_index_count: $reindexed_documents"
|
||||
return 0
|
||||
else
|
||||
sleep 3
|
||||
fi
|
||||
done
|
||||
|
||||
echo -e "\nPost-reindex document reconcialiation failed. doc_source_index_count: $initial_documents; doc_target_index_count: $reindexed_documents"
|
||||
return 1
|
||||
}
|
||||
|
||||
function reindex() {
|
||||
source_index=$1
|
||||
target_index="$1_$(date +%s)"
|
||||
|
||||
#create target index with latest index config
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$target_index" -H 'Content-Type: application/json' --data @/tmp/data
|
||||
|
||||
#reindex the documents in source index to target index.
|
||||
# One of the assumption here is that we only add properties to document when index-config is evolved.
|
||||
# In case a property is deleted from document, it will still be reindexed in target index as default behaviour and
|
||||
# it is not breaking the code. If still needs to be purged from target index, use "removed" property in POST data.
|
||||
curl -XPOST "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_reindex?pretty" -H 'Content-Type: application/json' \
|
||||
-d "{\"source\":{\"index\":\"$source_index\"},\"dest\":{\"index\":\"$target_index\"}}"
|
||||
|
||||
if check_reindex "$source_index" "$target_index"
|
||||
then
|
||||
#checking if source index is concrete index or alias
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_alias/$source_index") -eq 404 ]
|
||||
then
|
||||
curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$source_index"
|
||||
else
|
||||
concrete_index_name=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_alias/$source_index" | jq 'keys[]' | head -1 | tr -d \")
|
||||
curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$concrete_index_name"
|
||||
fi
|
||||
|
||||
curl -XPOST "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_aliases" -H 'Content-Type: application/json' \
|
||||
-d "{\"actions\":[{\"remove\":{\"index\":\"*\",\"alias\":\"$source_index\"}},{\"add\":{\"index\":\"$target_index\",\"alias\":\"$source_index\"}}]}"
|
||||
|
||||
echo -e "\nReindexing to $target_index succeded"
|
||||
return 0
|
||||
else
|
||||
curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$target_index"
|
||||
echo -e "\nReindexing to $target_index failed"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
function create_index() {
|
||||
generate_index_file "index/$2" "index/$3" /tmp/data
|
||||
|
||||
#checking if index(or alias) exists
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1") -eq 404 ]
|
||||
then
|
||||
echo -e '\ncreating index' "$1"
|
||||
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1" -H 'Content-Type: application/json' --data @/tmp/data
|
||||
return 0
|
||||
else
|
||||
echo -e '\ncomparing with existing version of index' "$1"
|
||||
|
||||
setting_keys_regex=$(jq '.index | keys[]' "index/$2" | xargs | sed 's/ /|/g')
|
||||
curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_settings" | \
|
||||
jq '.. | .settings? | select(. != null)' | \
|
||||
jq --arg KEYS_REGEX "$setting_keys_regex" '.index | with_entries(select(.key | match($KEYS_REGEX))) | {"index":.}' \
|
||||
> /tmp/existing_setting
|
||||
|
||||
curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_mapping" | \
|
||||
jq '.. | .mappings? | select(. != null)' \
|
||||
> /tmp/existing_mapping
|
||||
|
||||
generate_index_file /tmp/existing_setting /tmp/existing_mapping /tmp/existing
|
||||
|
||||
jq -S . /tmp/existing > /tmp/existing_sorted
|
||||
jq -S . /tmp/data > /tmp/data_sorted
|
||||
if diff /tmp/existing_sorted /tmp/data_sorted
|
||||
then
|
||||
echo -e "\nno changes to index $1 mappings and settings"
|
||||
return 0
|
||||
else
|
||||
echo -e "\nupdating index" "$1"
|
||||
|
||||
reindex "$1" && return 0 || return 1
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function create_datahub_usage_event_datastream() {
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/datahub_usage_event_policy") -eq 404 ]
|
||||
if [[ -z "$INDEX_PREFIX" ]]; then
|
||||
PREFIX=''
|
||||
else
|
||||
PREFIX="${INDEX_PREFIX}_"
|
||||
fi
|
||||
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy") -eq 404 ]
|
||||
then
|
||||
echo -e "\ncreating datahub_usage_event_policy"
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/index/usage-event/policy.json
|
||||
sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/policy.json | tee -a /tmp/policy.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/policy.json
|
||||
else
|
||||
echo -e "\ndatahub_usage_event_policy exists"
|
||||
fi
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/datahub_usage_event_index_template") -eq 404 ]
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ]
|
||||
then
|
||||
echo -e "\ncreating datahub_usage_event_index_template"
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/index/usage-event/index_template.json
|
||||
sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/index_template.json | tee -a /tmp/index_template.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/index_template.json
|
||||
else
|
||||
echo -e "\ndatahub_usage_event_index_template exists"
|
||||
fi
|
||||
}
|
||||
|
||||
function create_datahub_usage_event_aws_elasticsearch() {
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/datahub_usage_event_policy") -eq 404 ]
|
||||
if [[ -z "$INDEX_PREFIX" ]]; then
|
||||
PREFIX=''
|
||||
else
|
||||
PREFIX="${INDEX_PREFIX}_"
|
||||
fi
|
||||
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy") -eq 404 ]
|
||||
then
|
||||
echo -e "\ncreating datahub_usage_event_policy"
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/index/usage-event/aws_es_ism_policy.json
|
||||
sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_ism_policy.json | tee -a /tmp/aws_es_ism_policy.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/aws_es_ism_policy.json
|
||||
else
|
||||
echo -e "\ndatahub_usage_event_policy exists"
|
||||
fi
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/datahub_usage_event_index_template") -eq 404 ]
|
||||
if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ]
|
||||
then
|
||||
echo -e "\ncreating datahub_usage_event_index_template"
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/index/usage-event/aws_es_index_template.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"datahub_usage_event\":{\"is_write_index\":true}}}"
|
||||
echo -e "\ncreating datahub_usagAe_event_index_template"
|
||||
sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_index_template.json | tee -a /tmp/aws_es_index_template.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/aws_es_index_template.json
|
||||
curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"${PREFIX}datahub_usage_event\":{\"is_write_index\":true}}}"
|
||||
else
|
||||
echo -e "\ndatahub_usage_event_index_template exists"
|
||||
fi
|
||||
}
|
||||
|
||||
create_index $(get_index_name chartdocument) chart/settings.json chart/mappings.json || exit 1
|
||||
create_index $(get_index_name corpuserinfodocument) corp-user/settings.json corp-user/mappings.json || exit 1
|
||||
create_index $(get_index_name dashboarddocument) dashboard/settings.json dashboard/mappings.json || exit 1
|
||||
create_index $(get_index_name datajobdocument) datajob/settings.json datajob/mappings.json || exit 1
|
||||
create_index $(get_index_name dataflowdocument) dataflow/settings.json dataflow/mappings.json || exit 1
|
||||
create_index $(get_index_name dataprocessdocument) data-process/settings.json data-process/mappings.json || exit 1
|
||||
create_index $(get_index_name datasetdocument) dataset/settings.json dataset/mappings.json || exit 1
|
||||
create_index $(get_index_name mlmodeldocument) ml-model/settings.json ml-model/mappings.json || exit 1
|
||||
create_index $(get_index_name tagdocument) tags/settings.json tags/mappings.json || exit 1
|
||||
create_index $(get_index_name glossaryterminfodocument) glossary/term/settings.json glossary/term/mappings.json || exit 1
|
||||
create_index $(get_index_name glossarynodeinfodocument) glossary/node/settings.json glossary/node/mappings.json || exit 1
|
||||
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
|
||||
if [[ $USE_AWS_ELASTICSEARCH == false ]]; then
|
||||
create_datahub_usage_event_datastream || exit 1
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"index_patterns": ["datahub_usage_event-*"],
|
||||
"index_patterns": ["PREFIXdatahub_usage_event-*"],
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"@timestamp": {
|
||||
@ -20,6 +20,6 @@
|
||||
}
|
||||
},
|
||||
"settings": {
|
||||
"index.opendistro.index_state_management.rollover_alias": "datahub_usage_event"
|
||||
"index.opendistro.index_state_management.rollover_alias": "PREFIXdatahub_usage_event"
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"policy": {
|
||||
"policy_id": "datahub_usage_event_policy",
|
||||
"policy_id": "PREFIXdatahub_usage_event_policy",
|
||||
"default_state": "Rollover",
|
||||
"schema_version": 1,
|
||||
"states": [
|
||||
@ -50,7 +50,7 @@
|
||||
],
|
||||
"ism_template": {
|
||||
"index_patterns": [
|
||||
"datahub_usage_event-*"
|
||||
"PREFIXdatahub_usage_event-*"
|
||||
],
|
||||
"priority": 100
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"index_patterns": ["*datahub_usage_event*"],
|
||||
"index_patterns": ["*PREFIXdatahub_usage_event*"],
|
||||
"data_stream": { },
|
||||
"priority": 500,
|
||||
"template": {
|
||||
@ -23,7 +23,7 @@
|
||||
}
|
||||
},
|
||||
"settings": {
|
||||
"index.lifecycle.name": "datahub_usage_event_policy"
|
||||
"index.lifecycle.name": "PREFIXdatahub_usage_event_policy"
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user