From 54237ff5ca448e99cd8a1687ab02faaaeecf3c57 Mon Sep 17 00:00:00 2001 From: na zhang Date: Wed, 25 Oct 2017 11:27:57 -0700 Subject: [PATCH] support elasticsearch auto re-index with zero downtime via alias switch --- wherehows-backend/build.gradle | 3 + .../jobs/templates/ELASTIC_SEARCH_ETL.job | 6 +- .../main/java/wherehows/common/Constant.java | 2 + .../ELASTICSEARCH/index_mapping.json | 344 +++++++++--------- .../resources/jython/ElasticSearchIndex.py | 112 +++++- 5 files changed, 292 insertions(+), 175 deletions(-) diff --git a/wherehows-backend/build.gradle b/wherehows-backend/build.gradle index ffa5edd173..08133663cc 100644 --- a/wherehows-backend/build.gradle +++ b/wherehows-backend/build.gradle @@ -26,6 +26,9 @@ model { from("jobs") { into "jobs" } + from("../wherehows-data-model/ELASTICSEARCH") { + into "ELASTICSEARCH" + } } } } diff --git a/wherehows-backend/jobs/templates/ELASTIC_SEARCH_ETL.job b/wherehows-backend/jobs/templates/ELASTIC_SEARCH_ETL.job index 3c67d7002a..2f76075adc 100644 --- a/wherehows-backend/jobs/templates/ELASTIC_SEARCH_ETL.job +++ b/wherehows-backend/jobs/templates/ELASTIC_SEARCH_ETL.job @@ -1,6 +1,6 @@ # Push metadata to Elastic search server job.class=metadata.etl.JythonEtlJob -job.cron.expr=0 0 0/6 1/1 * ? * +job.cron.expr=0 0 0/12 1/1 * ? * job.timeout=10000 #job.cmd.params= job.disabled=1 @@ -12,3 +12,7 @@ job.jython.load=jython/ElasticSearchIndex.py elasticsearch.url=your_es_url elasticsearch.port=your_es_port elasticsearch.index=your_es_index +elasticsearch.bulk.insert.size=your_bulk_insert_chunk_size +elasticsearch.url.request.timeout=your_es_url_request_timeout +wh.db.max.retry.times=your_db_max_retry_times +wh.elasticsearch.index.mapping.file=your_index_mapping_file diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index e7257535aa..ebf69d5403 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -282,4 +282,6 @@ public class Constant { public static final String ELASTICSEARCH_BULK_INSERT_SIZE = "elasticsearch.bulk.insert.size"; public static final String ELASTICSEARCH_URL_REQUEST_TIMEOUT = "elasticsearch.url.request.timeout"; public static final String WH_DB_MAX_RETRY_TIMES = "wh.db.max.retry.times"; + public static final String WH_ELASTICSEARCH_INDEX_MAPPING_FILE = "wh.elasticsearch.index.mapping.file"; + } diff --git a/wherehows-data-model/ELASTICSEARCH/index_mapping.json b/wherehows-data-model/ELASTICSEARCH/index_mapping.json index fd862af762..6ae2f07981 100644 --- a/wherehows-data-model/ELASTICSEARCH/index_mapping.json +++ b/wherehows-data-model/ELASTICSEARCH/index_mapping.json @@ -13,92 +13,43 @@ } }, "mappings": { - "metric": { + "dataset": { "properties": { - "dashboard_name": { - "type": "string" + "fields": { + "type": "text" }, - "dimensions": { - "type": "string" + "location_prefix": { + "type": "text" }, - "metric_additive_type": { - "type": "string" + "name": { + "type": "text", + "analyzer": "keyword_analyzer" }, - "metric_category": { - "type": "string" - }, - "metric_description": { - "type": "string" - }, - "metric_display_factor": { - "type": "double" - }, - "metric_display_factor_sym": { - "type": "string" - }, - "metric_formula": { - "type": "string" - }, - "metric_good_direction": { - "type": "string" - }, - "metric_grain": { - "type": "string" - }, - "metric_group": { - "type": "string" - }, - "metric_id": { - "type": "long" - }, - "metric_level": { - "type": "string" - }, - "metric_name": { - "type": "string" - }, - "metric_name_suggest": { + "name_suggest": { "type": "completion", "analyzer": "standard" }, - "metric_ref_id": { - "type": "string" + "parent_name": { + "type": "text" }, - "metric_ref_id_type": { - "type": "string" + "properties": { + "type": "text" }, - "metric_source": { - "type": "string" + "schema": { + "type": "text" }, - "metric_source_dataset_id": { + "schema_type": { + "type": "text" + }, + "source": { + "type": "text" + }, + "static_boosting_score": { "type": "long" }, - "metric_source_type": { - "type": "string" - }, - "metric_sub_category": { - "type": "string" - }, - "metric_type": { - "type": "string" - }, - "metric_url": { - "type": "string" - }, - "owners": { - "type": "string" - }, - "scm_url": { - "type": "string" - }, - "tags": { - "type": "string" - }, - "metric_urn": { - "type": "string" - }, - "wiki_url": { - "type": "string" + "urn": { + "type": "text", + "analyzer": "keyword_analyzer" } } }, @@ -111,56 +62,99 @@ }, "properties": { "comment_type": { - "type": "string" + "type": "text" }, "dataset_id": { "type": "long" }, "text": { - "type": "string" + "type": "text" }, "user_id": { "type": "long" } } }, - "dataset": { + "flow_jobs": { "properties": { - "fields": { - "type": "string" + "app_code": { + "type": "text" }, - "location_prefix": { - "type": "string" + "app_id": { + "type": "long" }, - "name": { - "type": "string", - "analyzer": "keyword_analyzer" + "flow_group": { + "type": "text" }, - "name_suggest": { + "flow_id": { + "type": "long" + }, + "flow_level": { + "type": "long" + }, + "flow_name": { + "type": "text" + }, + "flow_name_suggest": { "type": "completion", "analyzer": "standard" }, - "parent_name": { - "type": "string" + "flow_path": { + "type": "text" }, - "properties": { - "type": "string" + "is_active": { + "type": "text" }, - "schema": { - "type": "string" + "is_scheduled": { + "type": "text" }, - "schema_type": { - "type": "string" + "jobs": { + "type": "nested", + "properties": { + "app_id": { + "type": "short" + }, + "flow_id": { + "type": "long" + }, + "is_current": { + "type": "text" + }, + "is_first": { + "type": "text" + }, + "is_last": { + "type": "text" + }, + "job_id": { + "type": "long" + }, + "job_name": { + "type": "text" + }, + "job_name_suggest": { + "type": "completion", + "analyzer": "standard" + }, + "job_path": { + "type": "text" + }, + "job_type": { + "type": "text" + }, + "job_type_id": { + "type": "short" + }, + "post_jobs": { + "type": "text" + }, + "pre_jobs": { + "type": "text" + } + } }, - "source": { - "type": "string" - }, - "static_boosting_score": { - "type": "long" - }, - "urn": { - "type": "string", - "analyzer": "keyword_analyzer" + "pre_flows": { + "type": "text" } } }, @@ -173,102 +167,108 @@ }, "properties": { "comments": { - "type": "string" + "type": "text" }, "dataset_id": { "type": "long" }, "field_name": { - "type": "string" + "type": "text" }, "parent_path": { - "type": "string" + "type": "text" }, "sort_id": { "type": "long" } } }, - "flow_jobs": { + "metric": { "properties": { - "app_code": { - "type": "string" + "dashboard_name": { + "type": "text" }, - "app_id": { + "dimensions": { + "type": "text" + }, + "metric_additive_type": { + "type": "text" + }, + "metric_category": { + "type": "text" + }, + "metric_description": { + "type": "text" + }, + "metric_display_factor": { + "type": "double" + }, + "metric_display_factor_sym": { + "type": "text" + }, + "metric_formula": { + "type": "text" + }, + "metric_good_direction": { + "type": "text" + }, + "metric_grain": { + "type": "text" + }, + "metric_group": { + "type": "text" + }, + "metric_id": { "type": "long" }, - "flow_group": { - "type": "string" + "metric_level": { + "type": "text" }, - "flow_id": { - "type": "long" + "metric_name": { + "type": "text" }, - "flow_level": { - "type": "long" - }, - "flow_name": { - "type": "string" - }, - "flow_name_suggest": { + "metric_name_suggest": { "type": "completion", "analyzer": "standard" }, - "flow_path": { - "type": "string" + "metric_ref_id": { + "type": "text" }, - "is_active": { - "type": "string" + "metric_ref_id_type": { + "type": "text" }, - "is_scheduled": { - "type": "string" + "metric_source": { + "type": "text" }, - "jobs": { - "type": "nested", - "properties": { - "app_id": { - "type": "short" - }, - "flow_id": { - "type": "long" - }, - "is_current": { - "type": "string" - }, - "is_first": { - "type": "string" - }, - "is_last": { - "type": "string" - }, - "job_id": { - "type": "long" - }, - "job_name": { - "type": "string" - }, - "job_name_suggest": { - "type": "completion", - "analyzer": "standard" - }, - "job_path": { - "type": "string" - }, - "job_type": { - "type": "string" - }, - "job_type_id": { - "type": "short" - }, - "post_jobs": { - "type": "string" - }, - "pre_jobs": { - "type": "string" - } - } + "metric_source_dataset_id": { + "type": "long" }, - "pre_flows": { - "type": "string" + "metric_source_type": { + "type": "text" + }, + "metric_sub_category": { + "type": "text" + }, + "metric_type": { + "type": "text" + }, + "metric_url": { + "type": "text" + }, + "metric_urn": { + "type": "text" + }, + "owners": { + "type": "text" + }, + "scm_url": { + "type": "text" + }, + "tags": { + "type": "text" + }, + "wiki_url": { + "type": "text" } } } diff --git a/wherehows-etl/src/main/resources/jython/ElasticSearchIndex.py b/wherehows-etl/src/main/resources/jython/ElasticSearchIndex.py index e3888710ff..460ee5262d 100644 --- a/wherehows-etl/src/main/resources/jython/ElasticSearchIndex.py +++ b/wherehows-etl/src/main/resources/jython/ElasticSearchIndex.py @@ -19,7 +19,6 @@ import sys, json import urllib2 import time - class ElasticSearchIndex(): def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) @@ -31,10 +30,19 @@ class ElasticSearchIndex(): else: self.elasticsearch_index = args[Constant.ELASTICSEARCH_INDEX_KEY] + self.index_mapping_file = args[Constant.WH_ELASTICSEARCH_INDEX_MAPPING_FILE] + self.bulk_chunk_size = int(args[Constant.ELASTICSEARCH_BULK_INSERT_SIZE]) # bulk insert size to elastic search engine self.es_url_request_timeout = int(args[Constant.ELASTICSEARCH_URL_REQUEST_TIMEOUT]) # url to post data to elastic search engine request time out self.max_retry_times = int(args[Constant.WH_DB_MAX_RETRY_TIMES]) # max times for db re-connection when lost during fetching source data + + self.base_url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.logger.info(self.base_url) + + self.old_index = [] + self.new_index = [] + self.databaseConnect(args) def databaseConnect(self, args): @@ -447,7 +455,7 @@ class ElasticSearchIndex(): self.wh_cursor.execute(cmd) self.wh_con.commit() - def run(self): + def es_reindex(self): try: start_dataset_time = time.time() @@ -485,6 +493,106 @@ class ElasticSearchIndex(): self.wh_con.close() + def es_http_request(self, method, url, payload): + try: + req = urllib2.Request(url=url) + req.add_header('Content-type', 'application/json') + req.get_method = lambda: method + req.add_data(payload) + + self.logger.info("Request sent to ES is: " + url + ' ' + payload) + response = urllib2.urlopen(req, timeout=self.es_url_request_timeout) + data = json.load(response) + return data + + except urllib2.HTTPError as e: + self.logger.error(str(e.code)) + self.logger.error(e.read()) + except Exception as e: + self.logger.error(str(e)) + + def create_index(self): + now = int(time.time()) + url = self.base_url + str(now) + + json_filepath = self.index_mapping_file + with open(json_filepath, 'r') as f: + req_body = json.load(f) + + data = self.es_http_request("PUT", url, json.dumps(req_body)) + if str(data['acknowledged']) != 'True': + self.logger.error(str(data)) + + self.new_index = str(now) + self.logger.info('Successfully created index : {}'.format(self.new_index)) + + def alias_switch(self): + url = self.base_url + '_aliases' + + remove = '{"remove":{"index": "%s","alias":"%s"}}' % (self.old_index, self.elasticsearch_index) + add = '{"add":{"index":"%s","alias":"%s"}}' % (self.new_index, self.elasticsearch_index) + req_body = '{"actions": [%s, %s]}' % (remove, add) + + data = self.es_http_request("POST", url, req_body) + + if str(data['acknowledged']) != 'True': + self.logger.error(str(data)) + + self.logger.info('Successfully switched alias from {} to {}'.format(self.old_index, self.new_index)) + + def create_alias(self, index): + # create a new alias + url = self.base_url + '_aliases' + req_body = '{"actions":[{"add":{"index":"%s","alias":"%s"}}]}' %(index, self.elasticsearch_index) + data = self.es_http_request("POST", url, req_body) + + if str(data['acknowledged']) != 'True': + self.logger.error(str(data)) + self.logger.info('Successfully create alias for: {}'.format(self.new_index)) + + def get_old_index(self): + # get existing index the current alias points to + url = self.base_url + self.elasticsearch_index + "/_alias" + try: + data = self.es_http_request("GET", url, '') + for key in data.keys(): + self.old_index = key + break + self.logger.info('Successfully find old index: {}'.format(self.old_index)) + except: + self.logger.info('Need to create alias for the first time') + + + def remove_old_index(self): + url = self.base_url + self.old_index + data = self.es_http_request("DELETE", url, '') + + if str(data['acknowledged']) != 'True': + self.logger.error(str(data)) + self.logger.info('Successfully removed index: {}'.format(self.old_index)) + + + def run(self): + try: + # 1, create a new index with current timestamp, using json body from model + self.create_index() + # 2, populate data + self.es_reindex() + # 3, get old index and remember it + self.get_old_index() + + if len(self.old_index) > 0: + # 4, remove existing alias, and add alias to newly created index + self.alias_switch() + # 5, remove old timestamp index + self.remove_old_index() + else: + # 6, first time add self.elasticsearch_index alias + self.create_alias(self.new_index) + except Exception as e: + self.logger.error(str(e)) + + if __name__ == "__main__": args = sys.argv[1] esi = ElasticSearchIndex(args)