From acb638cb6580b69d49ebed1d4d9cb3816035c17d Mon Sep 17 00:00:00 2001 From: Dexter Lee Date: Thu, 8 Jul 2021 15:11:40 -0700 Subject: [PATCH] fix(analytics): Fix SSL issue with analytics on frontend (#2840) --- .../app/react/analytics/AnalyticsService.java | 39 +++-- .../analytics/AnalyticsServiceModule.java | 37 +++-- .../react/controllers/TrackingController.java | 30 ++-- datahub-frontend/app/utils/ConfigUtil.java | 18 ++ datahub-frontend/conf/application.conf | 4 +- .../templates/deployment.yaml | 46 +++++- docker/elasticsearch-setup/create-indices.sh | 155 +++--------------- .../usage-event/aws_es_index_template.json | 4 +- .../index/usage-event/aws_es_ism_policy.json | 4 +- .../index/usage-event/index_template.json | 4 +- 10 files changed, 158 insertions(+), 183 deletions(-) create mode 100644 datahub-frontend/app/utils/ConfigUtil.java diff --git a/datahub-frontend/app/react/analytics/AnalyticsService.java b/datahub-frontend/app/react/analytics/AnalyticsService.java index 35445a73d8..3f703d0066 100644 --- a/datahub-frontend/app/react/analytics/AnalyticsService.java +++ b/datahub-frontend/app/react/analytics/AnalyticsService.java @@ -42,8 +42,8 @@ public class AnalyticsService { private final Logger _logger = LoggerFactory.getLogger(AnalyticsService.class.getName()); private final RestHighLevelClient _elasticClient; + private final Optional _indexPrefix; - private static final String INDEX_NAME = "datahub_usage_event"; private static final String FILTERED = "filtered"; private static final String DATE_HISTOGRAM = "date_histogram"; private static final String UNIQUE = "unique"; @@ -58,17 +58,24 @@ public class AnalyticsService { public static final String DATA_JOB_INDEX = "datajobindex_v2"; public static final String DATASET_INDEX = "datasetindex_v2"; - public AnalyticsService(final RestHighLevelClient elasticClient) { + public AnalyticsService(final RestHighLevelClient elasticClient, final Optional indexPrefix) { _elasticClient = elasticClient; + _indexPrefix = indexPrefix; + } + + private String getIndexName(String baseIndexName) { + return _indexPrefix.map(p -> p + "_").orElse("") + baseIndexName; } public List getTimeseriesChart(String indexName, DateRange dateRange, DateInterval granularity, Optional dimension, // Length 1 for now Map> filters, Optional uniqueOn) { + + String finalIndexName = getIndexName(indexName); _logger.debug( String.format("Invoked getTimeseriesChart with indexName: %s, dateRange: %s, granularity: %s, dimension: %s,", - indexName, dateRange, granularity, dimension) - + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); + finalIndexName, dateRange, granularity, dimension) + String.format("filters: %s, uniqueOn: %s", filters, + uniqueOn)); AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), Optional.of(dateRange)); @@ -84,7 +91,7 @@ public class AnalyticsService { filteredAgg.subAggregation(dateHistogram); } - SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg); + SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg); Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations(); try { if (dimension.isPresent()) { @@ -117,10 +124,10 @@ public class AnalyticsService { public List getBarChart(String indexName, Optional dateRange, List dimensions, // Length 1 or 2 Map> filters, Optional uniqueOn) { + String finalIndexName = getIndexName(indexName); _logger.debug( - String.format("Invoked getBarChart with indexName: %s, dateRange: %s, dimensions: %s,", - indexName, dateRange, dimensions) - + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); + String.format("Invoked getBarChart with indexName: %s, dateRange: %s, dimensions: %s,", finalIndexName, + dateRange, dimensions) + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); assert (dimensions.size() == 1 || dimensions.size() == 2); AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), dateRange); @@ -136,7 +143,7 @@ public class AnalyticsService { } filteredAgg.subAggregation(termAgg); - SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg); + SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg); Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations(); try { @@ -169,10 +176,10 @@ public class AnalyticsService { public List getTopNTableChart(String indexName, Optional dateRange, String groupBy, Map> filters, Optional uniqueOn, int maxRows) { + String finalIndexName = getIndexName(indexName); _logger.debug( - String.format("Invoked getTopNTableChart with indexName: %s, dateRange: %s, groupBy: %s", - indexName, dateRange, groupBy) - + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); + String.format("Invoked getTopNTableChart with indexName: %s, dateRange: %s, groupBy: %s", finalIndexName, + dateRange, groupBy) + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); AggregationBuilder filteredAgg = getFilteredAggregation(filters, ImmutableMap.of(), dateRange); @@ -183,7 +190,7 @@ public class AnalyticsService { } filteredAgg.subAggregation(termAgg); - SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg); + SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg); Aggregations aggregationResult = executeAndExtract(searchRequest).getAggregations(); try { @@ -200,10 +207,14 @@ public class AnalyticsService { public int getHighlights(String indexName, Optional dateRange, Map> filters, Map> mustNotFilters, Optional uniqueOn) { + String finalIndexName = getIndexName(indexName); + _logger.debug(String.format("Invoked getHighlights with indexName: %s, dateRange: %s", finalIndexName, dateRange) + + String.format("filters: %s, uniqueOn: %s", filters, uniqueOn)); + AggregationBuilder filteredAgg = getFilteredAggregation(filters, mustNotFilters, dateRange); uniqueOn.ifPresent(s -> filteredAgg.subAggregation(getUniqueQuery(s))); - SearchRequest searchRequest = constructSearchRequest(indexName, filteredAgg); + SearchRequest searchRequest = constructSearchRequest(finalIndexName, filteredAgg); Filter aggregationResult = executeAndExtract(searchRequest); try { if (uniqueOn.isPresent()) { diff --git a/datahub-frontend/app/react/analytics/AnalyticsServiceModule.java b/datahub-frontend/app/react/analytics/AnalyticsServiceModule.java index dbf48f7018..1bdf49043c 100644 --- a/datahub-frontend/app/react/analytics/AnalyticsServiceModule.java +++ b/datahub-frontend/app/react/analytics/AnalyticsServiceModule.java @@ -1,6 +1,7 @@ package react.analytics; import com.google.inject.AbstractModule; +import java.util.Optional; import org.apache.http.ssl.SSLContextBuilder; import play.Environment; import javax.annotation.Nonnull; @@ -14,6 +15,8 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.CertificateException; +import utils.ConfigUtil; + /** * Guice module responsible for configuring & creating an instance of {@link AnalyticsService}. @@ -33,6 +36,7 @@ public class AnalyticsServiceModule extends AbstractModule { private static final String ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH = "analytics.elastic.connectionRequestTimeout"; private static final String ELASTIC_CLIENT_USERNAME_PATH = "analytics.elastic.username"; private static final String ELASTIC_CLIENT_PASSWORD_PATH = "analytics.elastic.password"; + private static final String ELASTIC_INDEX_PREFIX = "analytics.elastic.indexPrefix"; /* Required SSL Config Paths @@ -61,29 +65,32 @@ public class AnalyticsServiceModule extends AbstractModule { @Override protected void configure() { final SSLContext context = createSSLContext(); - final AnalyticsService backend = new AnalyticsService(ElasticClientFactory.createElasticClient( - _configs.hasPath(ELASTIC_CLIENT_USE_SSL_PATH) && _configs.getBoolean(ELASTIC_CLIENT_USE_SSL_PATH), + final AnalyticsService backend = new AnalyticsService( + ElasticClientFactory.createElasticClient( + ConfigUtil.getBoolean(_configs, ELASTIC_CLIENT_USE_SSL_PATH), _configs.getString(ELASTIC_CLIENT_HOST_PATH), _configs.getInt(ELASTIC_CLIENT_PORT_PATH), - _configs.hasPath(ELASTIC_CLIENT_THREAD_COUNT_PATH) ? _configs.getInt(ELASTIC_CLIENT_THREAD_COUNT_PATH) : DEFAULT_THREAD_COUNT, - _configs.hasPath(ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH) ? _configs.getInt(ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH) : DEFAULT_CONNECTION_TIMEOUT, - _configs.hasPath(ELASTIC_CLIENT_USERNAME_PATH) ? _configs.getString(ELASTIC_CLIENT_USERNAME_PATH) : null, - _configs.hasPath(ELASTIC_CLIENT_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_PASSWORD_PATH) : null, + ConfigUtil.getInt(_configs, ELASTIC_CLIENT_THREAD_COUNT_PATH, DEFAULT_THREAD_COUNT), + ConfigUtil.getInt(_configs, ELASTIC_CLIENT_CONNECTION_REQUEST_TIMEOUT_PATH, DEFAULT_CONNECTION_TIMEOUT), + ConfigUtil.getString(_configs, ELASTIC_CLIENT_USERNAME_PATH, null), + ConfigUtil.getString(_configs, ELASTIC_CLIENT_PASSWORD_PATH, null), context - )); + ), + Optional.ofNullable(ConfigUtil.getString(_configs, ELASTIC_INDEX_PREFIX, null)) + ); bind(AnalyticsService.class).toInstance(backend); } private SSLContext createSSLContext() { - final String sslProtocol = _configs.hasPath(ELASTIC_CLIENT_SSL_PROTOCOL_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_PROTOCOL_PATH) : null; - final String sslTrustStoreFile = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH) : null; - final String sslTrustStoreType = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH) : null; - final String sslTrustStorePassword = _configs.hasPath(ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH): null; - final String sslKeyStoreFile = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH) : null; - final String sslKeyStoreType = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH) : null; - final String sslKeyStorePassword = _configs.hasPath(ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH) : null; - final String sslSecureRandomImplementation = _configs.hasPath(ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH) ? _configs.getString(ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH) : null; + final String sslProtocol = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_PROTOCOL_PATH, null); + final String sslTrustStoreFile = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_FILE_PATH, null); + final String sslTrustStoreType = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_TYPE_PATH, null); + final String sslTrustStorePassword = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_TRUST_STORE_PASSWORD_PATH, null); + final String sslKeyStoreFile = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_FILE_PATH, null); + final String sslKeyStoreType = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_TYPE_PATH, null); + final String sslKeyStorePassword = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_KEY_STORE_PASSWORD_PATH, null); + final String sslSecureRandomImplementation = ConfigUtil.getString(_configs, ELASTIC_CLIENT_SSL_SECURE_RANDOM_IMPL_PATH, null); final SSLContextBuilder sslContextBuilder = new SSLContextBuilder(); if (sslProtocol != null) { diff --git a/datahub-frontend/app/react/controllers/TrackingController.java b/datahub-frontend/app/react/controllers/TrackingController.java index 5fdf297c30..8d0cae0f38 100644 --- a/datahub-frontend/app/react/controllers/TrackingController.java +++ b/datahub-frontend/app/react/controllers/TrackingController.java @@ -23,7 +23,10 @@ import react.graphql.PlayQueryContext; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Properties; +import utils.ConfigUtil; + public class TrackingController extends Controller { @@ -86,6 +89,11 @@ public class TrackingController extends Controller { _producer.close(); } + private void setConfig(Properties props, String key, String configKey) { + Optional.ofNullable(ConfigUtil.getString(_config, configKey, null)) + .ifPresent(v -> props.put(key, v)); + } + private KafkaProducer createKafkaProducer() { final Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend"); @@ -97,22 +105,22 @@ public class TrackingController extends Controller { if (_config.hasPath(securityProtocolConfig) && KAFKA_SSL_PROTOCOLS.contains(_config.getString(securityProtocolConfig))) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, _config.getString(securityProtocolConfig)); - props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.key.password")); + setConfig(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "analytics.kafka.ssl.key.password"); - props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.keystore.type")); - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.keystore.location")); - props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.keystore.password")); + setConfig(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "analytics.kafka.ssl.keystore.type"); + setConfig(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.keystore.location"); + setConfig(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.keystore.password"); - props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.truststore.type")); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.truststore.location")); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.truststore.password")); + setConfig(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "analytics.kafka.ssl.truststore.type"); + setConfig(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.truststore.location"); + setConfig(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.truststore.password"); - props.put(SslConfigs.SSL_PROTOCOL_CONFIG, _config.getString("analytics.kafka.ssl.protocol")); - props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, _config.getString("analytics.kafka.ssl.endpoint.identification.algorithm")); + setConfig(props, SslConfigs.SSL_PROTOCOL_CONFIG, "analytics.kafka.ssl.protocol"); + setConfig(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "analytics.kafka.ssl.endpoint.identification.algorithm"); if (_config.getString(securityProtocolConfig).equals(SecurityProtocol.SASL_SSL.name())) { - props.put(SaslConfigs.SASL_MECHANISM, _config.getString("analytics.kafka.sasl.mechanism")); - props.put(SaslConfigs.SASL_JAAS_CONFIG, _config.getString("analytics.kafka.sasl.jaas.config")); + setConfig(props, SaslConfigs.SASL_MECHANISM, "analytics.kafka.sasl.mechanism"); + setConfig(props, SaslConfigs.SASL_JAAS_CONFIG, "analytics.kafka.sasl.jaas.config"); } } diff --git a/datahub-frontend/app/utils/ConfigUtil.java b/datahub-frontend/app/utils/ConfigUtil.java new file mode 100644 index 0000000000..616513224b --- /dev/null +++ b/datahub-frontend/app/utils/ConfigUtil.java @@ -0,0 +1,18 @@ +package utils; + +import com.typesafe.config.Config; + + +public class ConfigUtil { + public static boolean getBoolean(Config config, String key) { + return config.hasPath(key) && config.getBoolean(key); + } + + public static int getInt(Config config, String key, int defaultValue) { + return config.hasPath(key) ? config.getInt(key) : defaultValue; + } + + public static String getString(Config config, String key, String defaultValue) { + return config.hasPath(key) ? config.getString(key) : defaultValue; + } +} diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf index d7d29f9730..58329ba6ce 100644 --- a/datahub-frontend/conf/application.conf +++ b/datahub-frontend/conf/application.conf @@ -184,4 +184,6 @@ analytics.elastic.sslContext.sslKeyStorePassword = ${?ELASTIC_CLIENT_SSL_KEY_STO # Optional username + password for use with SSL analytics.elastic.username = ${?ELASTIC_CLIENT_USERNAME} -analytics.elastic.password = ${?ELASTIC_CLIENT_PASSWORD} \ No newline at end of file +analytics.elastic.password = ${?ELASTIC_CLIENT_PASSWORD} + +analytics.elastic.indexPrefix = ${?ELASTIC_INDEX_PREFIX} \ No newline at end of file diff --git a/datahub-kubernetes/datahub/charts/datahub-frontend/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-frontend/templates/deployment.yaml index 6174cd94aa..e3b531f451 100644 --- a/datahub-kubernetes/datahub/charts/datahub-frontend/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-frontend/templates/deployment.yaml @@ -26,14 +26,20 @@ spec: securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} volumes: + {{- with .Values.global.credentialsAndCertsSecrets }} + - name: datahub-certs-dir + secret: + defaultMode: 0444 + secretName: {{ .name }} + {{- end }} + {{- if .Values.exporters.jmx.enabled }} + - name: config-jmx-exporter + configMap: + name: {{ include "datahub-gms.fullname" . }}-config-jmx-exporter + {{- end }} {{- with .Values.extraVolumes }} {{- toYaml . | nindent 8 }} {{- end }} - {{- if .Values.exporters.jmx.enabled }} - - name: config-jmx-exporter - configMap: - name: {{ include "datahub-frontend.fullname" . }}-config-jmx-exporter - {{- end }} initContainers: {{- with .Values.extraInitContainers }} {{- toYaml . | nindent 8 }} @@ -82,8 +88,26 @@ spec: value: "{{ .Values.global.datahub.appVersion }}" - name: DATAHUB_PLAY_MEM_BUFFER_SIZE value: "{{ .Values.datahub.play.mem.buffer.size }}" + - name: DATAHUB_ANALYTICS_ENABLED + value: "{{ .Values.global.datahub_analytics_enabled }}" + {{- if .Values.global.datahub_analytics_enabled }} - name: KAFKA_BOOTSTRAP_SERVER value: "{{ .Values.global.kafka.bootstrap.server }}" + {{- if .Values.global.springKafkaConfigurationOverrides }} + {{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }} + - name: KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }} + value: {{ $configValue }} + {{- end }} + {{- end }} + {{- if .Values.global.credentialsAndCertsSecrets }} + {{- range $envVarName, $envVarValue := .Values.global.credentialsAndCertsSecrets.secureEnv }} + - name: KAFKA_PROPERTIES_{{ $envVarName | replace "." "_" | upper }} + valueFrom: + secretKeyRef: + name: {{ $.Values.global.credentialsAndCertsSecrets.name }} + key: {{ $envVarValue }} + {{- end }} + {{- end }} - name: ELASTIC_CLIENT_HOST value: "{{ .Values.global.elasticsearch.host }}" - name: ELASTIC_CLIENT_PORT @@ -101,14 +125,22 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} + {{- if .Values.global.kafka.topics }} + - name: DATAHUB_TRACKING_TOPIC + value: {{ .Values.global.kafka.topics.datahub_usage_event_name}} + {{- else }} - name: DATAHUB_TRACKING_TOPIC value: "DataHubUsageEvent_v1" - - name: DATAHUB_ANALYTICS_ENABLED - value: "{{ .Values.global.datahub_analytics_enabled }}" + {{- end }} + {{- end }} {{- with .Values.extraEnvs }} {{- toYaml . | nindent 12 }} {{- end }} volumeMounts: + {{- with .Values.global.credentialsAndCertsSecrets }} + - name: datahub-certs-dir + mountPath: {{ .path | default "/mnt/certs" }} + {{- end }} {{- with .Values.extraVolumeMounts }} {{- toYaml . | nindent 12 }} {{- end }} diff --git a/docker/elasticsearch-setup/create-indices.sh b/docker/elasticsearch-setup/create-indices.sh index 0c7d1ae0ba..c20114c717 100755 --- a/docker/elasticsearch-setup/create-indices.sh +++ b/docker/elasticsearch-setup/create-indices.sh @@ -17,160 +17,57 @@ else ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD@$ELASTICSEARCH_HOST fi -function get_index_name() { - if [[ -z "$INDEX_PREFIX" ]]; then - echo $1 - else - echo "${INDEX_PREFIX}_$1" - fi -} - -function generate_index_file() { - jq -n \ - --slurpfile settings "$1" \ - --slurpfile mappings "$2" \ - '.settings=$settings[0] | .mappings=$mappings[0]' > "$3" -} - -function check_reindex() { - initial_documents=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_count" -H 'Content-Type: application/json' | jq '.count') - for i in $(seq 30); do - echo $i - reindexed_documents=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$2/_count" -H 'Content-Type: application/json' | jq '.count') - if [[ $reindexed_documents == "$initial_documents" ]]; then - echo -e "\nPost-reindex document reconcialiation completed. doc_source_index_count: $initial_documents; doc_target_index_count: $reindexed_documents" - return 0 - else - sleep 3 - fi - done - - echo -e "\nPost-reindex document reconcialiation failed. doc_source_index_count: $initial_documents; doc_target_index_count: $reindexed_documents" - return 1 -} - -function reindex() { - source_index=$1 - target_index="$1_$(date +%s)" - - #create target index with latest index config - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$target_index" -H 'Content-Type: application/json' --data @/tmp/data - - #reindex the documents in source index to target index. - # One of the assumption here is that we only add properties to document when index-config is evolved. - # In case a property is deleted from document, it will still be reindexed in target index as default behaviour and - # it is not breaking the code. If still needs to be purged from target index, use "removed" property in POST data. - curl -XPOST "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_reindex?pretty" -H 'Content-Type: application/json' \ - -d "{\"source\":{\"index\":\"$source_index\"},\"dest\":{\"index\":\"$target_index\"}}" - - if check_reindex "$source_index" "$target_index" - then - #checking if source index is concrete index or alias - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_alias/$source_index") -eq 404 ] - then - curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$source_index" - else - concrete_index_name=$(curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_alias/$source_index" | jq 'keys[]' | head -1 | tr -d \") - curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$concrete_index_name" - fi - - curl -XPOST "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_aliases" -H 'Content-Type: application/json' \ - -d "{\"actions\":[{\"remove\":{\"index\":\"*\",\"alias\":\"$source_index\"}},{\"add\":{\"index\":\"$target_index\",\"alias\":\"$source_index\"}}]}" - - echo -e "\nReindexing to $target_index succeded" - return 0 - else - curl -XDELETE "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$target_index" - echo -e "\nReindexing to $target_index failed" - return 1 - fi -} - -function create_index() { - generate_index_file "index/$2" "index/$3" /tmp/data - - #checking if index(or alias) exists - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1") -eq 404 ] - then - echo -e '\ncreating index' "$1" - - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1" -H 'Content-Type: application/json' --data @/tmp/data - return 0 - else - echo -e '\ncomparing with existing version of index' "$1" - - setting_keys_regex=$(jq '.index | keys[]' "index/$2" | xargs | sed 's/ /|/g') - curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_settings" | \ - jq '.. | .settings? | select(. != null)' | \ - jq --arg KEYS_REGEX "$setting_keys_regex" '.index | with_entries(select(.key | match($KEYS_REGEX))) | {"index":.}' \ - > /tmp/existing_setting - - curl -XGET "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/$1/_mapping" | \ - jq '.. | .mappings? | select(. != null)' \ - > /tmp/existing_mapping - - generate_index_file /tmp/existing_setting /tmp/existing_mapping /tmp/existing - - jq -S . /tmp/existing > /tmp/existing_sorted - jq -S . /tmp/data > /tmp/data_sorted - if diff /tmp/existing_sorted /tmp/data_sorted - then - echo -e "\nno changes to index $1 mappings and settings" - return 0 - else - echo -e "\nupdating index" "$1" - - reindex "$1" && return 0 || return 1 - fi - fi -} - function create_datahub_usage_event_datastream() { - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/datahub_usage_event_policy") -eq 404 ] + if [[ -z "$INDEX_PREFIX" ]]; then + PREFIX='' + else + PREFIX="${INDEX_PREFIX}_" + fi + + if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy") -eq 404 ] then echo -e "\ncreating datahub_usage_event_policy" - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/index/usage-event/policy.json + sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/policy.json | tee -a /tmp/policy.json + curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/policy.json else echo -e "\ndatahub_usage_event_policy exists" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/datahub_usage_event_index_template") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] then echo -e "\ncreating datahub_usage_event_index_template" - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/index/usage-event/index_template.json + sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/index_template.json | tee -a /tmp/index_template.json + curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/index_template.json else echo -e "\ndatahub_usage_event_index_template exists" fi } function create_datahub_usage_event_aws_elasticsearch() { - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/datahub_usage_event_policy") -eq 404 ] + if [[ -z "$INDEX_PREFIX" ]]; then + PREFIX='' + else + PREFIX="${INDEX_PREFIX}_" + fi + + if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy") -eq 404 ] then echo -e "\ncreating datahub_usage_event_policy" - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/index/usage-event/aws_es_ism_policy.json + sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_ism_policy.json | tee -a /tmp/aws_es_ism_policy.json + curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/aws_es_ism_policy.json else echo -e "\ndatahub_usage_event_policy exists" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/datahub_usage_event_index_template") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] then - echo -e "\ncreating datahub_usage_event_index_template" - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/index/usage-event/aws_es_index_template.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"datahub_usage_event\":{\"is_write_index\":true}}}" + echo -e "\ncreating datahub_usagAe_event_index_template" + sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_index_template.json | tee -a /tmp/aws_es_index_template.json + curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/aws_es_index_template.json + curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"${PREFIX}datahub_usage_event\":{\"is_write_index\":true}}}" else echo -e "\ndatahub_usage_event_index_template exists" fi } -create_index $(get_index_name chartdocument) chart/settings.json chart/mappings.json || exit 1 -create_index $(get_index_name corpuserinfodocument) corp-user/settings.json corp-user/mappings.json || exit 1 -create_index $(get_index_name dashboarddocument) dashboard/settings.json dashboard/mappings.json || exit 1 -create_index $(get_index_name datajobdocument) datajob/settings.json datajob/mappings.json || exit 1 -create_index $(get_index_name dataflowdocument) dataflow/settings.json dataflow/mappings.json || exit 1 -create_index $(get_index_name dataprocessdocument) data-process/settings.json data-process/mappings.json || exit 1 -create_index $(get_index_name datasetdocument) dataset/settings.json dataset/mappings.json || exit 1 -create_index $(get_index_name mlmodeldocument) ml-model/settings.json ml-model/mappings.json || exit 1 -create_index $(get_index_name tagdocument) tags/settings.json tags/mappings.json || exit 1 -create_index $(get_index_name glossaryterminfodocument) glossary/term/settings.json glossary/term/mappings.json || exit 1 -create_index $(get_index_name glossarynodeinfodocument) glossary/node/settings.json glossary/node/mappings.json || exit 1 if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then if [[ $USE_AWS_ELASTICSEARCH == false ]]; then create_datahub_usage_event_datastream || exit 1 diff --git a/gms/impl/src/main/resources/index/usage-event/aws_es_index_template.json b/gms/impl/src/main/resources/index/usage-event/aws_es_index_template.json index 6bf5e01253..21e98e4e96 100644 --- a/gms/impl/src/main/resources/index/usage-event/aws_es_index_template.json +++ b/gms/impl/src/main/resources/index/usage-event/aws_es_index_template.json @@ -1,5 +1,5 @@ { - "index_patterns": ["datahub_usage_event-*"], + "index_patterns": ["PREFIXdatahub_usage_event-*"], "mappings": { "properties": { "@timestamp": { @@ -20,6 +20,6 @@ } }, "settings": { - "index.opendistro.index_state_management.rollover_alias": "datahub_usage_event" + "index.opendistro.index_state_management.rollover_alias": "PREFIXdatahub_usage_event" } } \ No newline at end of file diff --git a/gms/impl/src/main/resources/index/usage-event/aws_es_ism_policy.json b/gms/impl/src/main/resources/index/usage-event/aws_es_ism_policy.json index 913b16cb48..2bfb5db1ec 100644 --- a/gms/impl/src/main/resources/index/usage-event/aws_es_ism_policy.json +++ b/gms/impl/src/main/resources/index/usage-event/aws_es_ism_policy.json @@ -1,6 +1,6 @@ { "policy": { - "policy_id": "datahub_usage_event_policy", + "policy_id": "PREFIXdatahub_usage_event_policy", "default_state": "Rollover", "schema_version": 1, "states": [ @@ -50,7 +50,7 @@ ], "ism_template": { "index_patterns": [ - "datahub_usage_event-*" + "PREFIXdatahub_usage_event-*" ], "priority": 100 } diff --git a/gms/impl/src/main/resources/index/usage-event/index_template.json b/gms/impl/src/main/resources/index/usage-event/index_template.json index dce67e1b7b..44f6e64471 100644 --- a/gms/impl/src/main/resources/index/usage-event/index_template.json +++ b/gms/impl/src/main/resources/index/usage-event/index_template.json @@ -1,5 +1,5 @@ { - "index_patterns": ["*datahub_usage_event*"], + "index_patterns": ["*PREFIXdatahub_usage_event*"], "data_stream": { }, "priority": 500, "template": { @@ -23,7 +23,7 @@ } }, "settings": { - "index.lifecycle.name": "datahub_usage_event_policy" + "index.lifecycle.name": "PREFIXdatahub_usage_event_policy" } } } \ No newline at end of file