Merge pull request #270 from camelliazhang/master

search ranking improvement with static boosting
This commit is contained in:
camelliazhang 2016-11-08 17:30:29 -08:00 committed by GitHub
commit d911b14d18
3 changed files with 77 additions and 24 deletions

View File

@ -257,3 +257,8 @@ COMMENT = 'https://en.wikipedia.org/wiki/Computer_cluster' ;
CREATE UNIQUE INDEX uix_cfg_cluster__clustercode
ON cfg_cluster(cluster_code);
CREATE TABLE IF NOT EXISTS cfg_search_score_boost (
`id` INT COMMENT 'dataset id',
`static_boosting_score` INT COMMENT 'static boosting score for elastic search',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;

View File

@ -143,34 +143,62 @@ class ElasticSearchIndex():
""" % last_unixtime
else:
sql = """
SELECT * FROM dict_dataset
"""
INSERT IGNORE INTO cfg_search_score_boost
(id, static_boosting_score)
SELECT id, 8 FROM dict_dataset
WHERE urn like "kafka:///%" or urn like "oracle:///%" or urn like "espresso:///%";
INSERT IGNORE INTO cfg_search_score_boost
(id, static_boosting_score)
SELECT id, 7 FROM dict_dataset
WHERE urn like "hdfs:///%" or urn like "dali:///%";
INSERT IGNORE INTO cfg_search_score_boost
(id, static_boosting_score)
SELECT id, 6 FROM dict_dataset
WHERE urn like "hive:///%";
INSERT IGNORE INTO cfg_search_score_boost
(id, static_boosting_score)
SELECT id, 5 FROM dict_dataset
WHERE urn like "%/data/derived%";
SELECT * FROM dict_dataset d
JOIN cfg_search_score_boost s
WHERE d.id = s.id
"""
self.logger.debug('sql is ' + sql)
url = self.elasticsearch_index_url + ':' + str(self.elasticsearch_port) + '/wherehows/dataset/_bulk'
params = []
self.wh_cursor.execute(sql)
description = [x[0] for x in self.wh_cursor.description]
row_count = 1
result = self.wh_cursor.fetchone()
while result:
row = dict(zip(description, result))
params.append('{ "index": { "_id": ' + str(row['id']) + ' }}')
params.append(
"""{ "name": "%s", "source": "%s", "urn": "%s", "location_prefix": "%s", "parent_name": "%s","schema_type": "%s", "properties": %s, "schema": %s , "fields": %s}"""
% (row['name'] if row['name'] else '', row['source'] if row['source'] else '',
row['urn'] if row['urn'] else '', row['location_prefix'] if row['location_prefix'] else '',
row['parent_name'] if row['parent_name'] else '', row['schema_type'] if row['schema_type'] else '',
json.dumps(row['properties']) if row['properties'] else '',
json.dumps(row['schema']) if row['schema'] else '', json.dumps(row['fields']) if row['fields'] else ''))
if row_count % 1000 == 0:
self.bulk_insert(params, url)
self.logger.info('dataset ' + str(row_count))
params = []
row_count += 1
result = self.wh_cursor.fetchone()
while result:
row = dict(zip(description, result))
params.append('{ "index": { "_id": ' + str(row['id']) + ' }}')
params.append(
"""{ "name": "%s", "source": "%s", "urn": "%s", "location_prefix": "%s", "parent_name": "%s","schema_type": "%s", "properties": %s, "schema": %s, "fields": %s, "static_boosting_score": %s }"""
% (row['name'] if row['name'] else '', row['source'] if row['source'] else '',
row['urn'] if row['urn'] else '', row['location_prefix'] if row['location_prefix'] else '',
row['parent_name'] if row['parent_name'] else '', row['schema_type'] if row['schema_type'] else '',
json.dumps(row['properties']) if row['properties'] else '',
json.dumps(row['schema']) if row['schema'] else '', json.dumps(row['fields']) if row['fields'] else '',
json.dumps(row['static_boosting_score']) if row['static_boosting_score'] else ''))
if row_count % 1000 == 0:
self.bulk_insert(params, url)
self.logger.info('dataset ' + str(row_count))
params = []
row_count += 1
result = self.wh_cursor.fetchone()
self.logger.info('total dataset row count is: ' + str(row_count))
if len(params) > 0:
self.bulk_insert(params, url)
self.logger.info('dataset ' + str(len(params)))
self.bulk_insert(params, url)
self.logger.info('dataset ' + str(len(params)))
def update_metric(self):
sql = """
@ -305,6 +333,6 @@ class ElasticSearchIndex():
self.wh_con.close()
if __name__ == "__main__":
props = sys.argv[1]
esi = ElasticSearchIndex(props)
esi.run()
props = sys.argv[1]
esi = ElasticSearchIndex(props)
esi.run()

View File

@ -202,10 +202,30 @@ public class SearchDAO extends AbstractMySQLOpenSourceDAO
if (keywordNode != null)
{
queryNode.set("query", keywordNode);
ObjectNode funcScoreNodes = Json.newObject();
ObjectNode fieldValueFactorNode = Json.newObject();
fieldValueFactorNode.put("field","static_boosting_score");
fieldValueFactorNode.put("factor",1);
fieldValueFactorNode.put("modifier","square");
fieldValueFactorNode.put("missing",1);
funcScoreNodes.put("query", keywordNode);
funcScoreNodes.put("field_value_factor",fieldValueFactorNode);
ObjectNode funcScoreNodesWrapper = Json.newObject();
funcScoreNodesWrapper.put("function_score",funcScoreNodes);
queryNode.put("query",funcScoreNodesWrapper);
Logger.debug("The query sent to Elastic Search is: " + queryNode.toString());
Promise<WSResponse> responsePromise = WS.url(Play.application().configuration().getString(
SearchDAO.ELASTICSEARCH_DATASET_URL_KEY)).post(queryNode);
responseNode = responsePromise.get(1000).asJson();
Logger.debug("The responseNode from Elastic Search is: " + responseNode.toString());
}
ObjectNode resultNode = Json.newObject();