Make Elasticsearch configuration more flexible (#29)

This commit is contained in:
Malte Pietsch 2020-02-27 15:11:56 +01:00 committed by GitHub
parent 041f832eee
commit e3db47ff2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 94 deletions

View File

@ -1,6 +1,7 @@
import logging import logging
import pandas as pd import pandas as pd
from haystack.finder import Finder
pd.options.display.max_colwidth = 80 pd.options.display.max_colwidth = 80
@ -11,44 +12,4 @@ logging.getLogger('farm.infer').setLevel(logging.INFO)
logging.getLogger('transformers').setLevel(logging.WARNING) logging.getLogger('transformers').setLevel(logging.WARNING)
logging.getLogger('farm.eval').setLevel(logging.INFO) logging.getLogger('farm.eval').setLevel(logging.INFO)
class Finder:
"""
Finder ties together instances of the Reader and Retriever class.
It provides an interface to predict top n answers for a given question.
"""
def __init__(self, reader, retriever):
self.retriever = retriever
self.reader = reader
def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None):
"""
Get top k answers for a given question.
:param question: the question string
:param top_k_reader: number of answers returned by the reader
:param top_k_retriever: number of text units to be retrieved
:param filters: limit scope to documents having the given tags and their corresponding values.
The format for the dict is {"tag-1": "value-1", "tag-2": "value-2" ...}
:return:
"""
# 1) Optional: reduce the search space via document tags
if filters:
candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters)
else:
candidate_doc_ids = None
# 2) Apply retriever to get fast candidate paragraphs
paragraphs, meta_data = self.retriever.retrieve(question, top_k=top_k_retriever, candidate_doc_ids=candidate_doc_ids)
# 3) Apply reader to get granular answer(s)
logger.info(f"Applying the reader now to look for the answer in detail ...")
results = self.reader.predict(question=question,
paragraphs=paragraphs,
meta_data_paragraphs=meta_data,
top_k=top_k_reader)
return results

View File

@ -1,85 +1,124 @@
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Document as ESDoc, Text, connections from elasticsearch.helpers import scan
from haystack.database.base import BaseDocumentStore from haystack.database.base import BaseDocumentStore
class Document(ESDoc):
name = Text()
text = Text()
tags = Text()
class Index:
name = "document"
class ElasticsearchDocumentStore(BaseDocumentStore): class ElasticsearchDocumentStore(BaseDocumentStore):
def __init__(self, host="localhost", username="", password="", index="document"): def __init__(
self,
host="localhost",
username="",
password="",
index="document",
search_fields="text",
text_field="text",
name_field="name",
doc_id_field="document_id",
tag_fields=None,
custom_mapping=None,
):
self.client = Elasticsearch(hosts=[{"host": host}], http_auth=(username, password)) self.client = Elasticsearch(hosts=[{"host": host}], http_auth=(username, password))
self.connections = connections.create_connection(hosts=[{"host": host}], http_auth=(username, password)) # if no custom_mapping is supplied, use the default mapping
Document.init() # create mapping if not exists. if not custom_mapping:
custom_mapping = {
"mappings": {
"properties": {
name_field: {"type": "text"},
text_field: {"type": "text"},
doc_id_field: {"type": "text"},
}
}
}
# create an index if not exists
self.client.indices.create(index=index, ignore=400, body=custom_mapping)
self.index = index self.index = index
# configure mappings to ES fields that will be used for querying / displaying results
if type(search_fields) == str:
search_fields = [search_fields]
self.search_fields = search_fields
self.text_field = text_field
self.name_field = name_field
self.tag_fields = tag_fields
self.doc_id_field = doc_id_field
def get_document_by_id(self, id): def get_document_by_id(self, id):
query = {"filter": {"term": {"_id": id}}} query = {"filter": {"term": {"_id": id}}}
result = self.client.search(index=self.index, body=query)["hits"]["hits"] result = self.client.search(index=self.index, body=query)["hits"]["hits"]
if result: if result:
document = {"id": result["_id"], "name": result["name"], "text": result["text"]} document = {
"id": result[self.doc_id_field],
"name": result[self.name_field],
"text": result[self.text_field],
}
else:
document = None
return document
def get_document_by_name(self, name):
query = {"filter": {"term": {self.name_field: name}}}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
if result:
document = {
"id": result[self.doc_id_field],
"name": result[self.name_field],
"text": result[self.text_field],
}
else: else:
document = None document = None
return document return document
def get_document_ids_by_tags(self, tags): def get_document_ids_by_tags(self, tags):
query = { term_queries = [{"terms": {key: value}} for key, value in tags.items()]
"query": { query = {"query": {"bool": {"must": term_queries}}}
"bool": { result = self.client.search(index=self.index, body=query, size=10000)["hits"]["hits"]
"should": [ doc_ids = []
{
"terms": {
"tags": tags
}
}
]
}
}
}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
documents = []
for hit in result: for hit in result:
documents.append({"id": hit["_id"], "name": hit["name"], "text": hit["text"]}) doc_ids.append(hit["_id"])
return documents return doc_ids
def write_documents(self, documents): def write_documents(self, documents):
for doc in documents: for d in documents:
d = Document( self.client.index(index=self.index, body=d)
name=doc["name"],
text=doc["text"],
document_id=doc.get("document_id", None),
tags=doc.get("tags", None),
)
d.save()
def get_document_count(self): def get_document_count(self):
s = Search(using=self.client, index=self.index) result = self.client.count()
return s.count() count = result["count"]
return count
def get_all_documents(self): def get_all_documents(self):
search = Search(using=self.client, index=self.index).scan() result = scan(self.client, query={"query": {"match_all": {}}}, index=self.index)
documents = [] documents = []
for hit in search: for hit in result:
documents.append( documents.append(
{ {
"id": hit.meta["id"], "id": hit["_source"][self.doc_id_field],
"name": hit["name"], "name": hit["_source"][self.name_field],
"text": hit["text"], "text": hit["_source"][self.text_field],
} }
) )
return documents return documents
def query(self, query, top_k=10): def query(self, query, top_k=10, candidate_doc_ids=None):
search = Search(using=self.client, index=self.index).query("match", text=query)[:top_k].execute() # TODO:
# for now: we keep the current structure of candidate_doc_ids for compatibility with SQL documentstores
# midterm: get rid of it and do filtering with tags directly in this query
body = {
"size": top_k,
"query": {
"bool": {
"must": [{"multi_match": {"query": query, "type": "most_fields", "fields": self.search_fields}}]
}
},
}
if candidate_doc_ids:
body["query"]["bool"]["filter"] = [{"terms": {"_id": candidate_doc_ids}}]
result = self.client.search(index=self.index, body=body)["hits"]["hits"]
paragraphs = [] paragraphs = []
meta_data = [] meta_data = []
for hit in search: for hit in result:
paragraphs.append(hit["text"]) paragraphs.append(hit["_source"][self.text_field])
meta_data.append({"paragraph_id": hit.meta["id"], "document_id": hit["document_id"]}) meta_data.append({"paragraph_id": hit["_id"], "document_id": hit["_source"][self.doc_id_field]})
return paragraphs, meta_data return paragraphs, meta_data

43
haystack/finder.py Normal file
View File

@ -0,0 +1,43 @@
import logging
logger = logging.getLogger(__name__)
class Finder:
"""
Finder ties together instances of the Reader and Retriever class.
It provides an interface to predict top n answers for a given question.
"""
def __init__(self, reader, retriever):
self.retriever = retriever
self.reader = reader
def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None):
"""
Get top k answers for a given question.
:param question: the question string
:param top_k_reader: number of answers returned by the reader
:param top_k_retriever: number of text units to be retrieved
:param filters: limit scope to documents having the given tags and their corresponding values.
The format for the dict is {"tag-1": "value-1", "tag-2": "value-2" ...}
:return:
"""
# 1) Optional: reduce the search space via document tags
if filters:
candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters)
else:
candidate_doc_ids = None
# 2) Apply retriever to get fast candidate paragraphs
paragraphs, meta_data = self.retriever.retrieve(question, top_k=top_k_retriever, candidate_doc_ids=candidate_doc_ids)
# 3) Apply reader to get granular answer(s)
logger.info(f"Applying the reader now to look for the answer in detail ...")
results = self.reader.predict(question=question,
paragrahps=paragraphs,
meta_data_paragraphs=meta_data,
top_k=top_k_reader)
return results

View File

@ -6,4 +6,5 @@ class ElasticsearchRetriever(BaseRetriever):
self.document_store = document_store self.document_store = document_store
def retrieve(self, query, candidate_doc_ids=None, top_k=10): def retrieve(self, query, candidate_doc_ids=None, top_k=10):
return self.document_store.query(query, top_k)
return self.document_store.query(query, top_k, candidate_doc_ids)

View File

@ -7,4 +7,3 @@ pandas
psycopg2-binary psycopg2-binary
sklearn sklearn
elasticsearch elasticsearch
elasticsearch_dsl