diff --git a/data-model/ELASTICSEARCH/index_mapping.json b/data-model/ELASTICSEARCH/index_mapping.json new file mode 100644 index 0000000000..3c129cb4a1 --- /dev/null +++ b/data-model/ELASTICSEARCH/index_mapping.json @@ -0,0 +1,245 @@ +{ + "mappings": { + "metric": { + "properties": { + "dashboard_name": { + "type": "string" + }, + "dimensions": { + "type": "string" + }, + "metric_additive_type": { + "type": "string" + }, + "metric_category": { + "type": "string" + }, + "metric_description": { + "type": "string" + }, + "metric_display_factor": { + "type": "double" + }, + "metric_display_factor_sym": { + "type": "string" + }, + "metric_formula": { + "type": "string" + }, + "metric_good_direction": { + "type": "string" + }, + "metric_grain": { + "type": "string" + }, + "metric_group": { + "type": "string" + }, + "metric_id": { + "type": "long" + }, + "metric_level": { + "type": "string" + }, + "metric_name": { + "type": "string" + }, + "metric_ref_id": { + "type": "string" + }, + "metric_ref_id_type": { + "type": "string" + }, + "metric_source": { + "type": "string" + }, + "metric_source_dataset_id": { + "type": "long" + }, + "metric_source_type": { + "type": "string" + }, + "metric_sub_category": { + "type": "string" + }, + "metric_type": { + "type": "string" + }, + "metric_url": { + "type": "string" + }, + "owners": { + "type": "string" + }, + "scm_url": { + "type": "string" + }, + "tags": { + "type": "string" + }, + "urn": { + "type": "string" + }, + "wiki_url": { + "type": "string" + } + } + }, + "field": { + "_parent": { + "type": "dataset" + }, + "_routing": { + "required": true + }, + "properties": { + "comments": { + "type": "string" + }, + "dataset_id": { + "type": "long" + }, + "field_name": { + "type": "string" + }, + "parent_path": { + "type": "string" + }, + "sort_id": { + "type": "long" + } + } + }, + "comment": { + "_parent": { + "type": "dataset" + }, + "_routing": { + "required": true + }, + "properties": { + "comment_type": { + "type": "string" + }, + "dataset_id": { + "type": "long" + }, + "text": { + "type": "string" + }, + "user_id": { + "type": "long" + } + } + }, + "dataset": { + "properties": { + "fields": { + "type": "string" + }, + "location_prefix": { + "type": "string" + }, + "name": { + "type": "string" + }, + "parent_name": { + "type": "string" + }, + "properties": { + "type": "string" + }, + "schema": { + "type": "string" + }, + "schema_type": { + "type": "string" + }, + "source": { + "type": "string" + }, + "static_boosting_score": { + "type": "long" + }, + "urn": { + "type": "string" + } + } + }, + "flow_jobs": { + "properties": { + "app_code": { + "type": "string" + }, + "app_id": { + "type": "long" + }, + "flow_group": { + "type": "string" + }, + "flow_id": { + "type": "long" + }, + "flow_level": { + "type": "long" + }, + "flow_name": { + "type": "string" + }, + "flow_path": { + "type": "string" + }, + "is_active": { + "type": "string" + }, + "is_scheduled": { + "type": "string" + }, + "jobs": { + "type": "nested", + "properties": { + "app_id": { + "type": "short" + }, + "flow_id": { + "type": "long" + }, + "is_current": { + "type": "string" + }, + "is_first": { + "type": "string" + }, + "is_last": { + "type": "string" + }, + "job_id": { + "type": "long" + }, + "job_name": { + "type": "string" + }, + "job_path": { + "type": "string" + }, + "job_type": { + "type": "string" + }, + "job_type_id": { + "type": "short" + }, + "post_jobs": { + "type": "string" + }, + "pre_jobs": { + "type": "string" + } + } + }, + "pre_flows": { + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/data-model/ELASTICSEARCH/index_mappings.md b/data-model/ELASTICSEARCH/index_mappings.md index 63d575f5e5..b0df1d0e62 100644 --- a/data-model/ELASTICSEARCH/index_mappings.md +++ b/data-model/ELASTICSEARCH/index_mappings.md @@ -1,43 +1,46 @@ -curl -XPUT '$YOUR_INDEX_URL:9200/wherehows' -d ' -{ - "mappings": { - "dataset": {}, - "comment": { - "_parent": { - "type": "dataset" - } - }, - "field": { - "_parent": { - "type": "dataset" - } - } - } -} -' -curl -XPUT '$YOUR_INDEX_URL:9200/wherehows/flow_jobs/_mapping' -d ' +https://www.elastic.co/guide/index.html + +``` +export ELASTICSEARCH_SERVER_URL=http://localhost:9200 +``` + +create index and put mappings +``` +curl -XPUT '$ELASTICSEARCH_SERVER_URL/wherehows_v1' --data @index_mapping.json + +``` + +create index alias +Using aliases has allowed us to continue using elasticsearch without a huge operational nightmare +this allows switching transparently between one index and another on a running cluster +Here we use wherehows as alias for different versions, newer indexes such as wherehows_v2 can be created, populated and then points to wherehows, as a public index interface +``` +curl -XPUT '$ELASTICSEARCH_SERVER_URL/_aliases' -d ' { - "flow_jobs": { - "properties": { - "jobs": { - "type": "nested", - "properties": { - "job_name": { "type": "string" }, - "job_path": { "type": "string" }, - "job_type": { "type": "string" }, - "pre_jobs": { "type": "string" }, - "post_jobs": { "type": "string" }, - "is_current": { "type": "string" }, - "is_first": { "type": "string" }, - "is_last": { "type": "string" }, - "job_type_id": { "type": "short" }, - "app_id": { "type": "short" }, - "flow_id": { "type": "long" }, - "job_id": { "type": "long" } - } + "actions": [ + { + "add": { + "index": "wherehows_v1", + "alias": "wherehows" } } - } -} -' \ No newline at end of file + ] +}' +``` + +query index/type mapping +``` +$ELASTICSEARCH_SERVER_URL/wherehows/_mapping/dataset +$ELASTICSEARCH_SERVER_URL/wherehows/_mapping/comment +$ELASTICSEARCH_SERVER_URL/wherehows/_mapping/flow_jobs +$ELASTICSEARCH_SERVER_URL/wherehows/_mapping/field +$ELASTICSEARCH_SERVER_URL/wherehows/_mapping/metric + +$ELASTICSEARCH_SERVER_URL:9200/wherehows/_mapping/ +``` + +delete an index +``` +curl -XDELETE '$ELASTICSEARCH_SERVER_URL:9200/wherehows' +``` \ No newline at end of file diff --git a/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py b/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py index 497daa39f7..516ff6ef13 100644 --- a/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py +++ b/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py @@ -27,6 +27,13 @@ class ElasticSearchIndex(): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) self.elasticsearch_index_url = args[Constant.WH_ELASTICSEARCH_URL_KEY] self.elasticsearch_port = args[Constant.WH_ELASTICSEARCH_PORT_KEY] + + if Constant.WH_ELASTICSEARCH_INDEX_KEY not in args: + self.elasticsearch_index = "wherehows" + else: + self.elasticsearch_index = args[Constant.WH_ELASTICSEARCH_INDEX_KEY] + + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], args[Constant.WH_DB_USERNAME_KEY], args[Constant.WH_DB_PASSWORD_KEY], @@ -39,6 +46,7 @@ class ElasticSearchIndex(): req.add_header('Content-type', 'application/json') req.get_method = lambda: "PUT" req.add_data('\n'.join(params) + '\n') + self.logger.info(url) response = urllib2.urlopen(req) data = json.load(response) if str(data['errors']) != 'False': @@ -61,7 +69,7 @@ class ElasticSearchIndex(): SELECT d.field_id, d.dataset_id, f.comment FROM dict_dataset_field_comment d LEFT JOIN field_comments f ON d.comment_id = f.id WHERE d.field_id = %d """ - url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/field/_bulk' + url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.elasticsearch_index + '/field/_bulk' params = [] self.wh_cursor.execute(sql) comment_cursor = self.wh_con.cursor(1) @@ -80,17 +88,15 @@ class ElasticSearchIndex(): comment_result = comment_cursor.fetchone() params.append('{ "index": { "_id": ' + str(row['field_id']) + ', "parent": ' + str(row['dataset_id']) + ' }}') - if len(comments) > 0: - params.append( - """{ "comments": %s, "dataset_id": %d, "sort_id": %d, "field_name": "%s", "parent_path": "%s"}""" - % (json.dumps(comments) if comments else '', row['dataset_id'] if row['dataset_id'] else 0, - row['sort_id'] if row['sort_id'] else 0, - row['field_name'] if row['field_name'] else '', row['parent_path'] if row['parent_path'] else '')) - else: - params.append( - """{ "comments": "", "dataset_id": %d, "sort_id": %d, "field_name": "%s", "parent_path": "%s"}""" - % (row['dataset_id'] if row['dataset_id'] else 0, row['sort_id'] if row['sort_id'] else 0, - row['field_name'] if row['field_name'] else '', row['parent_path'] if row['parent_path'] else '')) + comments_detail = { + 'comments': comments, + 'dataset_id': row['dataset_id'], + 'sort_id': row['sort_id'], + 'field_name': row['field_name'], + 'parent_path': row['parent_path'] + } + params.append(json.dumps(comments_detail)) + if row_count % 1000 == 0: self.bulk_insert(params, url) self.logger.info('dataset field ' + str(row_count)) @@ -113,7 +119,8 @@ class ElasticSearchIndex(): SELECT * FROM comments """ - url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/comment/_bulk' + url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.elasticsearch_index + '/comment/_bulk' + params = [] self.wh_cursor.execute(sql) row_count = 1 @@ -122,10 +129,15 @@ class ElasticSearchIndex(): while result: row = dict(zip(description, result)) params.append('{ "index": { "_id": ' + str(row['id']) + ', "parent": ' + str(row['dataset_id']) + ' }}') - params.append( - """{ "text": %s, "user_id": %d, "dataset_id": %d, "comment_type": "%s"}""" - % (json.dumps(row['text']) if row['text'] else '', row['user_id'] if row['user_id'] else 0, - row['dataset_id'] if row['dataset_id'] else 0, row['comment_type'] if row['comment_type'] else '')) + + text_detail = { + 'text': row['text'], + 'user_id': row['user_id'], + 'dataset_id': row['dataset_id'], + 'comment_type': row['comment_type'] + } + params.append(json.dumps(text_detail)) + if row_count % 1000 == 0: self.bulk_insert(params, url) self.logger.info('comment ' + str(row_count)) @@ -189,11 +201,13 @@ class ElasticSearchIndex(): ON DUPLICATE KEY UPDATE static_boosting_score = 65; - SELECT * FROM dict_dataset d - JOIN cfg_search_score_boost s - WHERE d.id = s.id - and urn not like "hive:///dev_foundation_tables%" - and urn not like "hive:///dev_foundation_views%" + SELECT d.*, + COALESCE(s.static_boosting_score,0) as static_boosting_score + FROM dict_dataset d + LEFT JOIN cfg_search_score_boost s + ON d.id = s.id + WHERE d.urn not like "hive:///dev_foundation_tables%" + and d.urn not like "hive:///dev_foundation_views%" """ self.execute_commands(sql) @@ -203,21 +217,27 @@ class ElasticSearchIndex(): row_count = 1 result = self.wh_cursor.fetchone() - url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/dataset/_bulk' + url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.elasticsearch_index + '/dataset/_bulk' params = [] while result: row = dict(zip(description, result)) + dataset_detail = { + 'name': row['name'], + 'source': row['source'], + 'urn': row['urn'], + 'location_prefix': row['location_prefix'], + 'parent_name': row['parent_name'], + 'schema_type': row['schema_type'], + 'properties': row['properties'], + 'schema': row['schema'], + 'fields': row['fields'], + 'static_boosting_score': row['static_boosting_score'] + } + params.append('{ "index": { "_id": ' + str(row['id']) + ' }}') - params.append( - """{ "name": "%s", "source": "%s", "urn": "%s", "location_prefix": "%s", "parent_name": "%s","schema_type": "%s", "properties": %s, "schema": %s, "fields": %s, "static_boosting_score": %s }""" - % (row['name'] if row['name'] else '', row['source'] if row['source'] else '', - row['urn'] if row['urn'] else '', row['location_prefix'] if row['location_prefix'] else '', - row['parent_name'] if row['parent_name'] else '', row['schema_type'] if row['schema_type'] else '', - json.dumps(row['properties']) if row['properties'] else '', - json.dumps(row['schema']) if row['schema'] else '', json.dumps(row['fields']) if row['fields'] else '', - json.dumps(row['static_boosting_score']) if row['static_boosting_score'] else '')) + params.append(json.dumps(dataset_detail)) if row_count % 1000 == 0: self.bulk_insert(params, url) @@ -234,7 +254,8 @@ class ElasticSearchIndex(): sql = """ SELECT * FROM dict_business_metric """ - url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/metric/_bulk' + + url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.elasticsearch_index + '/metric/_bulk' params = [] self.wh_cursor.execute(sql) description = [x[0] for x in self.wh_cursor.description] @@ -242,35 +263,38 @@ class ElasticSearchIndex(): result = self.wh_cursor.fetchone() while result: row = dict(zip(description, result)) + metric_detail = { + 'metric_id': row['metric_id'], + 'metric_name': row['metric_name'], + 'metric_description': row['metric_description'], + 'dashboard_name': row['dashboard_name'], + 'metric_group': row['metric_group'], + 'metric_category': row['metric_category'], + 'metric_sub_category': row['metric_sub_category'], + 'metric_level': row['metric_level'], + 'metric_source_type': row['metric_source_type'], + 'metric_source': row['metric_source'], + 'metric_source_dataset_id': row['metric_source_dataset_id'], + 'metric_ref_id_type': row['metric_ref_id_type'], + 'metric_ref_id': row['metric_ref_id'], + 'metric_type': row['metric_type'], + 'metric_additive_type': row['metric_additive_type'], + 'metric_grain': row['metric_grain'], + 'metric_display_factor': row['metric_display_factor'], + 'metric_display_factor_sym': row['metric_display_factor_sym'], + 'metric_good_direction': row['metric_good_direction'], + 'metric_formula': row['metric_formula'], + 'dimensions': row['dimensions'], + 'owners': row['owners'], + 'tags': row['tags'], + 'urn': row['urn'], + 'metric_url': row['metric_url'], + 'wiki_url': row['wiki_url'], + 'scm_url': row['scm_url'] + } params.append('{ "index": { "_id": ' + str(row['metric_id']) + ' }}') - params.append( - """{"metric_id": %d, "metric_name": %s, "metric_description": %s, "dashboard_name": %s, "metric_group": %s, "metric_category": %s, "metric_sub_category": %s, "metric_level": %s, "metric_source_type": %s, "metric_source": %s, "metric_source_dataset_id": %d, "metric_ref_id_type": %s, "metric_ref_id": %s, "metric_type": %s, "metric_additive_type": %s, "metric_grain": %s, "metric_display_factor": %f, "metric_display_factor_sym": %s, "metric_good_direction": %s, "metric_formula": %s, "dimensions": %s, "owners": %s, "tags": %s, "urn": %s, "metric_url": %s, "wiki_url": %s, "scm_url": %s}""" - % (row['metric_id'], json.dumps(row['metric_name']) if row['metric_name'] else json.dumps(''), - json.dumps(row['metric_description']) if row['metric_description'] else json.dumps(''), - json.dumps(row['dashboard_name']) if row['dashboard_name'] else json.dumps(''), - json.dumps(row['metric_group']) if row['metric_group'] else json.dumps(''), - json.dumps(row['metric_category']) if row['metric_category'] else json.dumps(''), - json.dumps(row['metric_sub_category']) if row['metric_sub_category'] else json.dumps(''), - json.dumps(row['metric_level']) if row['metric_level'] else json.dumps(''), - json.dumps(row['metric_source_type']) if row['metric_source_type'] else json.dumps(''), - json.dumps(row['metric_source']) if row['metric_source'] else json.dumps(''), - row['metric_source_dataset_id'] if row['metric_source_dataset_id'] else 0, - json.dumps(row['metric_ref_id_type']) if row['metric_ref_id_type'] else json.dumps(''), - json.dumps(row['metric_ref_id']) if row['metric_ref_id'] else json.dumps(''), - json.dumps(row['metric_type']) if row['metric_type'] else json.dumps(''), - json.dumps(row['metric_additive_type']) if row['metric_additive_type'] else json.dumps(''), - json.dumps(row['metric_grain']) if row['metric_grain'] else json.dumps(''), - row['metric_display_factor'] if row['metric_display_factor'] else 0.0, - json.dumps(row['metric_display_factor_sym']) if row['metric_display_factor_sym'] else json.dumps(''), - json.dumps(row['metric_good_direction']) if row['metric_good_direction'] else json.dumps(''), - json.dumps(row['metric_formula']) if row['metric_formula'] else json.dumps(''), - json.dumps(row['dimensions']) if row['dimensions'] else json.dumps(''), - json.dumps(row['owners']) if row['owners'] else json.dumps(''), - json.dumps(row['tags']) if row['tags'] else json.dumps(''), - json.dumps(row['urn']) if row['urn'] else json.dumps(''), - json.dumps(row['metric_url']) if row['metric_url'] else json.dumps(''), - json.dumps(row['wiki_url']) if row['wiki_url'] else json.dumps(''), - json.dumps(row['scm_url']) if row['scm_url'] else json.dumps(''))) + params.append(json.dumps(metric_detail)) + if row_count % 1000 == 0: self.bulk_insert(params, url) self.logger.info('metric ' + str(row_count)) @@ -294,7 +318,9 @@ class ElasticSearchIndex(): job_sql = """ SELECT * FROM flow_job WHERE app_id = %d and flow_id = %d """ - url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/flow_jobs/_bulk' + + url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/' + self.elasticsearch_index + '/flow_jobs/_bulk' + params = [] self.wh_cursor.execute(flow_sql) job_cursor = self.wh_con.cursor(1) @@ -304,40 +330,44 @@ class ElasticSearchIndex(): while result: row = dict(zip(description, result)) job_cursor.execute(job_sql %(long(row['app_id']), long(row['flow_id']))) - jobs = [] + jobs_info = [] job_description = [x[0] for x in job_cursor.description] job_result = job_cursor.fetchone() while job_result: job_row = dict(zip(job_description, job_result)) - jobs.append({"app_id": job_row['app_id'], "flow_id": job_row['flow_id'], "job_id": job_row['job_id'], - "job_name": job_row['job_name'] if job_row['job_name'] else '', - "job_path": job_row['job_path'] if job_row['job_path'] else '', - "job_type_id": job_row['job_type_id'], - "job_type": job_row['job_type'] if job_row['job_type'] else '', - "pre_jobs": job_row['pre_jobs'] if job_row['pre_jobs'] else '', - "post_jobs": job_row['post_jobs'] if job_row['post_jobs'] else '', - "is_current": job_row['is_current'] if job_row['is_current'] else '', - "is_first": job_row['is_first'] if job_row['is_first'] else '', - "is_last": job_row['is_last'] if job_row['is_last'] else ''}) + jobs_row_detail = { + 'app_id': job_row['app_id'], + 'flow_id': job_row['flow_id'], + 'job_id': job_row['job_id'], + 'job_name': job_row['job_name'], + 'job_path': job_row['job_path'], + 'job_type_id': job_row['job_type_id'], + 'job_type': job_row['job_type'], + 'pre_jobs': job_row['pre_jobs'], + 'post_jobs': job_row['post_jobs'], + 'is_current': job_row['is_current'], + 'is_first': job_row['is_first'], + 'is_last': job_row['is_last'] + } + jobs_info.append(jobs_row_detail) job_result = job_cursor.fetchone() params.append('{ "index": { "_id": ' + str(long(row['flow_id'])*10000 + long(row['app_id'])) + ' }}') - if len(jobs) > 0: - params.append( - """{"app_id": %d, "flow_id": %d, "app_code": "%s", "flow_name": "%s", "flow_group": "%s", "flow_path": "%s", "flow_level": %d, "is_active": "%s", "is_scheduled": "%s", "pre_flows": "%s", "jobs": %s}""" - % (row['app_id'], row['flow_id'], row['app_code'] if row['app_code'] else '', - row['flow_name'] if row['flow_name'] else '', row['flow_group'] if row['flow_group'] else '', - row['flow_path'] if row['flow_path'] else '', row['flow_level'], - row['is_active'] if row['is_active'] else '', row['is_scheduled'] if row['is_scheduled'] else '', - row['pre_flows'] if row['pre_flows'] else '', json.dumps(jobs))) - else: - params.append( - """{"app_id": %d, "flow_id": %d, "app_code": "%s", "flow_name": "%s", "flow_group": "%s", "flow_path": "%s", "flow_level": %d, "is_active": "%s", "is_scheduled": "%s", "pre_flows": "%s", "jobs": ""}""" - % (row['app_id'], row['flow_id'], row['app_code'] if row['app_code'] else '', - row['flow_name'] if row['flow_name'] else '', row['flow_group'] if row['flow_group'] else '', - row['flow_path'] if row['flow_path'] else '', row['flow_level'], - row['is_active'] if row['is_active'] else '', row['is_scheduled'] if row['is_scheduled'] else '', - row['pre_flows'] if row['pre_flows'] else '')) + jobs_detail = { + 'app_id': row['app_id'], + 'flow_id': row['flow_id'], + 'app_code': row['app_code'], + 'flow_name': row['flow_name'], + 'flow_group': row['flow_group'], + 'flow_path': row['flow_path'], + 'flow_level': row['flow_level'], + 'is_active': row['is_active'], + 'is_scheduled': row['is_scheduled'], + 'pre_flows': row['pre_flows'], + 'jobs': jobs_info + } + params.append(json.dumps(jobs_detail)) + if row_count % 1000 == 0: self.bulk_insert(params, url) self.logger.info('flow jobs ' + str(row_count)) diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index 7f8201e003..db0563116b 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -196,6 +196,7 @@ public class Constant { public static final String WH_ELASTICSEARCH_URL_KEY = "wh.elasticsearch.url"; public static final String WH_ELASTICSEARCH_PORT_KEY = "wh.elasticsearch.port"; + public static final String WH_ELASTICSEARCH_INDEX_KEY = "wh.elasticsearch.index"; // Oracle public static final String ORA_DB_USERNAME_KEY = "oracle.db.username";