From 217b7d9d0953afd7afa651ecf3ed29cf8ea94121 Mon Sep 17 00:00:00 2001 From: Na Zhang Date: Mon, 7 Nov 2016 14:02:46 -0800 Subject: [PATCH] search ranking improvement with static boosting --- .../DDL/ETL_DDL/etl_configure_tables.sql | 5 ++ .../resources/jython/ElasticSearchIndex.py | 74 +++++++++++++------ web/app/dao/SearchDAO.java | 22 +++++- 3 files changed, 77 insertions(+), 24 deletions(-) diff --git a/data-model/DDL/ETL_DDL/etl_configure_tables.sql b/data-model/DDL/ETL_DDL/etl_configure_tables.sql index b9a9493895..bfe26e1bdc 100644 --- a/data-model/DDL/ETL_DDL/etl_configure_tables.sql +++ b/data-model/DDL/ETL_DDL/etl_configure_tables.sql @@ -257,3 +257,8 @@ COMMENT = 'https://en.wikipedia.org/wiki/Computer_cluster' ; CREATE UNIQUE INDEX uix_cfg_cluster__clustercode ON cfg_cluster(cluster_code); +CREATE TABLE IF NOT EXISTS cfg_search_score_boost ( + `id` INT COMMENT 'dataset id', + `static_boosting_score` INT COMMENT 'static boosting score for elastic search', + PRIMARY KEY (`id`) +) ENGINE = InnoDB DEFAULT CHARSET = latin1; \ No newline at end of file diff --git a/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py b/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py index b605f36826..ca82d4b5b2 100644 --- a/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py +++ b/metadata-etl/src/main/resources/jython/ElasticSearchIndex.py @@ -143,34 +143,62 @@ class ElasticSearchIndex(): """ % last_unixtime else: sql = """ - SELECT * FROM dict_dataset - """ + INSERT IGNORE INTO cfg_search_score_boost + (id, static_boosting_score) + SELECT id, 8 FROM dict_dataset + WHERE urn like "kafka:///%" or urn like "oracle:///%" or urn like "espresso:///%"; + + INSERT IGNORE INTO cfg_search_score_boost + (id, static_boosting_score) + SELECT id, 7 FROM dict_dataset + WHERE urn like "hdfs:///%" or urn like "dali:///%"; + + INSERT IGNORE INTO cfg_search_score_boost + (id, static_boosting_score) + SELECT id, 6 FROM dict_dataset + WHERE urn like "hive:///%"; + + INSERT IGNORE INTO cfg_search_score_boost + (id, static_boosting_score) + SELECT id, 5 FROM dict_dataset + WHERE urn like "%/data/derived%"; + + SELECT * FROM dict_dataset d + JOIN cfg_search_score_boost s + WHERE d.id = s.id + """ + self.logger.debug('sql is ' + sql) url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/dataset/_bulk' params = [] self.wh_cursor.execute(sql) description = [x[0] for x in self.wh_cursor.description] + row_count = 1 result = self.wh_cursor.fetchone() - while result: - row = dict(zip(description, result)) - params.append('{ "index": { "_id": ' + str(row['id']) + ' }}') - params.append( - """{ "name": "%s", "source": "%s", "urn": "%s", "location_prefix": "%s", "parent_name": "%s","schema_type": "%s", "properties": %s, "schema": %s , "fields": %s}""" - % (row['name'] if row['name'] else '', row['source'] if row['source'] else '', - row['urn'] if row['urn'] else '', row['location_prefix'] if row['location_prefix'] else '', - row['parent_name'] if row['parent_name'] else '', row['schema_type'] if row['schema_type'] else '', - json.dumps(row['properties']) if row['properties'] else '', - json.dumps(row['schema']) if row['schema'] else '', json.dumps(row['fields']) if row['fields'] else '')) - if row_count % 1000 == 0: - self.bulk_insert(params, url) - self.logger.info('dataset ' + str(row_count)) - params = [] - row_count += 1 - result = self.wh_cursor.fetchone() + while result: + row = dict(zip(description, result)) + + params.append('{ "index": { "_id": ' + str(row['id']) + ' }}') + params.append( + """{ "name": "%s", "source": "%s", "urn": "%s", "location_prefix": "%s", "parent_name": "%s","schema_type": "%s", "properties": %s, "schema": %s, "fields": %s, "static_boosting_score": %s }""" + % (row['name'] if row['name'] else '', row['source'] if row['source'] else '', + row['urn'] if row['urn'] else '', row['location_prefix'] if row['location_prefix'] else '', + row['parent_name'] if row['parent_name'] else '', row['schema_type'] if row['schema_type'] else '', + json.dumps(row['properties']) if row['properties'] else '', + json.dumps(row['schema']) if row['schema'] else '', json.dumps(row['fields']) if row['fields'] else '', + json.dumps(row['static_boosting_score']) if row['static_boosting_score'] else '')) + + if row_count % 1000 == 0: + self.bulk_insert(params, url) + self.logger.info('dataset ' + str(row_count)) + params = [] + row_count += 1 + result = self.wh_cursor.fetchone() + self.logger.info('total dataset row count is: ' + str(row_count)) if len(params) > 0: - self.bulk_insert(params, url) - self.logger.info('dataset ' + str(len(params))) + self.bulk_insert(params, url) + self.logger.info('dataset ' + str(len(params))) def update_metric(self): sql = """ @@ -305,6 +333,6 @@ class ElasticSearchIndex(): self.wh_con.close() if __name__ == "__main__": - props = sys.argv[1] - esi = ElasticSearchIndex(props) - esi.run() + props = sys.argv[1] + esi = ElasticSearchIndex(props) + esi.run() diff --git a/web/app/dao/SearchDAO.java b/web/app/dao/SearchDAO.java index 61d689ee27..f6199ce823 100644 --- a/web/app/dao/SearchDAO.java +++ b/web/app/dao/SearchDAO.java @@ -202,10 +202,30 @@ public class SearchDAO extends AbstractMySQLOpenSourceDAO if (keywordNode != null) { - queryNode.set("query", keywordNode); + ObjectNode funcScoreNodes = Json.newObject(); + + ObjectNode fieldValueFactorNode = Json.newObject(); + fieldValueFactorNode.put("field","static_boosting_score"); + fieldValueFactorNode.put("factor",1); + fieldValueFactorNode.put("modifier","square"); + fieldValueFactorNode.put("missing",1); + + funcScoreNodes.put("query", keywordNode); + funcScoreNodes.put("field_value_factor",fieldValueFactorNode); + + ObjectNode funcScoreNodesWrapper = Json.newObject(); + funcScoreNodesWrapper.put("function_score",funcScoreNodes); + + queryNode.put("query",funcScoreNodesWrapper); + + Logger.debug("The query sent to Elastic Search is: " + queryNode.toString()); + Promise responsePromise = WS.url(Play.application().configuration().getString( SearchDAO.ELASTICSEARCH_DATASET_URL_KEY)).post(queryNode); responseNode = responsePromise.get(1000).asJson(); + + Logger.debug("The responseNode from Elastic Search is: " + responseNode.toString()); + } ObjectNode resultNode = Json.newObject();