Add Elasticsearch Document Store (#13)

This commit is contained in:
Tanay Soni 2020-01-24 18:24:07 +01:00 committed by GitHub
parent a0293cc996
commit f83a164095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 164 additions and 63 deletions

View File

@ -65,21 +65,26 @@ Usage
.. image:: https://raw.githubusercontent.com/deepset-ai/haystack/master/docs/img/code_snippet_usage.png
Configuration
-------------
The configuration can be supplied in a :code:`qa_config.py` placed in the PYTHONPATH. Alternatively, the :code:`DATABASE_URL` can also be set as an environment variable.
Deployment
==========
SQL Backend
-----------
The database ORM layer is implemented using SQLAlchemy library. By default, it uses the file-based SQLite database. For large scale deployments, the configuration can be changed to use other compatible databases like PostgreSQL or MySQL.
Haystack has an extensible document store layer.
There are currently implementations of Elasticsearch and SQL (see :code:`haystack.database.elasticsearch.ElasticsearchDocumentStore` and :code:`haystack.database.sql.SQLDocumentStore`).
Elasticsearch Backend
----------------------
(Coming soon)
---------------------
Elasticsearch is recommended for deploying on a large scale. The documents can optionally be chunked into smaller units (e.g., paragraphs) before indexing to make the results returned by the Retriever more granular and accurate.
Retrievers can access an Elasticsearch index to find the relevant paragraphs(or documents) for a query. The default `ElasticsearchRetriever` uses Elasticsearch's native scoring (BM25), but can be extended easily with custom implementations.
You can get started by running a single Elasticsearch node using docker::
docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.5.1
SQL Backend
-----------
The SQL backend layer is mainly meant to simplify the first development steps. By default, a local file-based SQLite database is initialized.
However, if you prefer a PostgreSQL or MySQL backend for production, you can easily configure this since our implementation is based on SQLAlchemy.
REST API
--------

Binary file not shown.

Before

Width:  |  Height:  |  Size: 229 KiB

After

Width:  |  Height:  |  Size: 241 KiB

View File

@ -1,8 +1,7 @@
from haystack.retriever.tfidf import TfidfRetriever
from haystack.reader.farm import FARMReader
import logging
import pandas as pd
pd.options.display.max_colwidth = 80
logger = logging.getLogger(__name__)
@ -21,8 +20,6 @@ class Finder:
def __init__(self, reader, retriever):
self.retriever = retriever
self.retriever.fit()
self.reader = reader
def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None):
@ -39,7 +36,7 @@ class Finder:
# 1) Optional: reduce the search space via document tags
if filters:
candidate_doc_ids = self.retriever.datastore.get_document_ids_by_tags(filters)
candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters)
else:
candidate_doc_ids = None

View File

@ -3,7 +3,7 @@ from abc import abstractmethod
class BaseDocumentStore:
"""
Base class for implementing DataStores.
Base class for implementing Document Stores.
"""
@abstractmethod

View File

@ -0,0 +1,85 @@
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Document as ESDoc, Text, connections
from haystack.database.base import BaseDocumentStore
class Document(ESDoc):
name = Text()
text = Text()
tags = Text()
class Index:
name = "document"
class ElasticsearchDocumentStore(BaseDocumentStore):
def __init__(self, host="localhost", username="", password="", index="document"):
self.client = Elasticsearch(hosts=[{"host": host}], http_auth=(username, password))
self.connections = connections.create_connection(hosts=[{"host": host}], http_auth=(username, password))
Document.init() # create mapping if not exists.
self.index = index
def get_document_by_id(self, id):
query = {"filter": {"term": {"_id": id}}}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
if result:
document = {"id": result["_id"], "name": result["name"], "text": result["text"]}
else:
document = None
return document
def get_document_ids_by_tags(self, tags):
query = {
"query": {
"bool": {
"should": [
{
"terms": {
"tags": tags
}
}
]
}
}
}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
documents = []
for hit in result:
documents.append({"id": hit["_id"], "name": hit["name"], "text": hit["text"]})
return documents
def write_documents(self, documents):
for doc in documents:
d = Document(
name=doc["name"],
text=doc["text"],
document_id=doc.get("document_id", None),
tags=doc.get("tags", None),
)
d.save()
def get_document_count(self):
s = Search(using=self.client, index=self.index)
return s.count()
def get_all_documents(self):
search = Search(using=self.client, index=self.index)
documents = []
for hit in search:
documents.append(
{
"id": hit.meta["id"],
"name": hit["name"],
"text": hit["text"],
}
)
return documents
def query(self, query, top_k=10):
search = Search(using=self.client, index=self.index).query("match", text=query)[:top_k].execute()
paragraphs = []
meta_data = []
for hit in search:
paragraphs.append(hit["text"])
meta_data.append({"paragraph_id": hit.meta["id"], "document_id": hit["document_id"]})
return paragraphs, meta_data

View File

@ -83,9 +83,8 @@ class SQLDocumentStore(BaseDocumentStore):
GROUP BY dt.document_id
"""
tag_filters = []
for tag, value in tags.items():
if value:
tag_filters.append(f"SUM(CASE WHEN t.value='{value}' THEN 1 ELSE 0 END) > 0")
for tag in tags:
tag_filters.append(f"SUM(CASE WHEN t.value='{tag}' THEN 1 ELSE 0 END) > 0")
final_query = f"{query} HAVING {' AND '.join(tag_filters)});"
query_results = self.session.execute(final_query)

View File

@ -8,7 +8,7 @@ import zipfile
logger = logging.getLogger(__name__)
def write_documents_to_db(datastore, document_dir, clean_func=None, only_empty_db=False):
def write_documents_to_db(document_store, document_dir, clean_func=None, only_empty_db=False, split_paragrahs=False):
"""
Write all text files(.txt) in the sub-directories of the given path to the connected database.
@ -22,28 +22,42 @@ def write_documents_to_db(datastore, document_dir, clean_func=None, only_empty_d
# check if db has already docs
if only_empty_db:
n_docs = datastore.get_document_count()
n_docs = document_store.get_document_count()
if n_docs > 0:
logger.info(f"Skip writing documents since DB already contains {n_docs} docs ... "
"(Disable `only_empty_db`, if you want to add docs anyway.)")
return None
# read and add docs
documents_to_write = []
docs_to_index = []
doc_id = 1
for path in file_paths:
with open(path) as doc:
text = doc.read()
if clean_func:
text = clean_func(text)
documents_to_write.append(
{
"name": path.name,
"text": text,
}
)
datastore.write_documents(documents_to_write)
logger.info(f"Wrote {len(documents_to_write)} docs to DB")
if split_paragrahs:
for para in text.split("\n\n"):
if not para.strip(): # skip empty paragraphs
continue
docs_to_index.append(
{
"name": path.name,
"text": para,
"document_id": doc_id
}
)
doc_id += 1
else:
docs_to_index.append(
{
"name": path.name,
"text": text,
}
)
document_store.write_documents(docs_to_index)
logger.info(f"Wrote {len(docs_to_index)} docs to DB")
def fetch_archive_from_http(url, output_dir, proxies=None):

View File

@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class BaseRetriever(ABC):
@abstractmethod
def retrieve(self, query, candidate_doc_ids=None, top_k=1):
pass

View File

@ -0,0 +1,9 @@
from haystack.retriever.base import BaseRetriever
class ElasticsearchRetriever(BaseRetriever):
def __init__(self, document_store):
self.document_store = document_store
def retrieve(self, query, candidate_doc_ids=None, top_k=10):
return self.document_store.query(query, top_k)

View File

@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from collections import OrderedDict, namedtuple
import logging
from collections import OrderedDict, namedtuple
import pandas as pd
from haystack.retriever.base import BaseRetriever
from sklearn.feature_extraction.text import TfidfVectorizer
@ -11,20 +12,6 @@ logger = logging.getLogger(__name__)
Paragraph = namedtuple("Paragraph", ["paragraph_id", "document_id", "text"])
class BaseRetriever(ABC):
@abstractmethod
def _get_all_paragraphs(self):
pass
@abstractmethod
def retrieve(self, query, candidate_doc_ids=None, top_k=1):
pass
@abstractmethod
def fit(self):
pass
class TfidfRetriever(BaseRetriever):
"""
Read all documents from a SQL backend.
@ -35,7 +22,7 @@ class TfidfRetriever(BaseRetriever):
It uses sklearn's TfidfVectorizer to compute a tf-idf matrix.
"""
def __init__(self, datastore):
def __init__(self, document_store):
self.vectorizer = TfidfVectorizer(
lowercase=True,
stop_words=None,
@ -43,7 +30,7 @@ class TfidfRetriever(BaseRetriever):
ngram_range=(1, 1),
)
self.datastore = datastore
self.document_store = document_store
self.paragraphs = self._get_all_paragraphs()
self.df = None
self.fit()
@ -52,12 +39,11 @@ class TfidfRetriever(BaseRetriever):
"""
Split the list of documents in paragraphs
"""
documents = self.datastore.get_all_documents()
documents = self.document_store.get_all_documents()
paragraphs = []
p_id = 0
for doc in documents:
_pgs = [d for d in doc["text"].splitlines() if d.strip()]
for p in doc["text"].split("\n\n"):
if not p.strip(): # skip empty paragraphs
continue

View File

@ -1 +0,0 @@
DATABASE_URL = "sqlite:///qa.db"

View File

@ -3,9 +3,9 @@ from haystack.indexing.io import write_documents_to_db
def test_db_write_read():
sql_datastore = SQLDocumentStore()
write_documents_to_db(datastore=sql_datastore, document_dir="samples/docs")
documents = sql_datastore.get_all_documents()
sql_document_store = SQLDocumentStore()
write_documents_to_db(document_store=sql_document_store, document_dir="samples/docs")
documents = sql_document_store.get_all_documents()
assert len(documents) == 2
doc = sql_datastore.get_document_by_id("1")
doc = sql_document_store.get_document_by_id("1")
assert doc.keys() == {"id", "name", "text", "tags"}

View File

@ -86,12 +86,12 @@
"# The documents can be stored in different types of \"DocumentStores\".\n",
"# For dev we suggest a light-weight SQL DB\n",
"# For production we suggest elasticsearch\n",
"datastore = SQLDocumentStore(url=\"sqlite:///qa.db\")\n",
"document_store = SQLDocumentStore(url=\"sqlite:///qa.db\")\n",
"\n",
"# Now, let's write the docs to our DB.\n",
"# You can optionally supply a cleaning function that is applied to each doc (e.g. to remove footers)\n",
"# It must take a str as input, and return a str.\n",
"write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)"
"write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)"
]
},
{
@ -122,7 +122,7 @@
"source": [
"# A retriever identifies the k most promising chunks of text that might contain the answer for our question\n",
"# Retrievers use some simple but fast algorithm, here: TF-IDF\n",
"retriever = TfidfRetriever(datastore=datastore)"
"retriever = TfidfRetriever(document_store=document_store)"
]
},
{

View File

@ -20,18 +20,18 @@ fetch_archive_from_http(url=s3_url, output_dir=doc_dir)
# The documents can be stored in different types of "DocumentStores".
# For dev we suggest a light-weight SQL DB
# For production we suggest elasticsearch
datastore = SQLDocumentStore(url="sqlite:///qa.db")
document_store = SQLDocumentStore(url="sqlite:///qa.db")
# Now, let's write the docs to our DB.
# You can optionally supply a cleaning function that is applied to each doc (e.g. to remove footers)
# It must take a str as input, and return a str.
write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
## Initalize Reader, Retriever & Finder
# A retriever identifies the k most promising chunks of text that might contain the answer for our question
# Retrievers use some simple but fast algorithm, here: TF-IDF
retriever = TfidfRetriever(datastore=datastore)
retriever = TfidfRetriever(document_store=document_store)
# A reader scans the text chunks in detail and extracts the k best answers
# Reader use more powerful but slower deep learning models

View File

@ -27,14 +27,14 @@ fetch_archive_from_http(url=s3_url, output_dir=doc_dir)
# Init Document store & write docs to it
datastore = SQLDocumentStore(url="sqlite:///qa.db")
write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
document_store = SQLDocumentStore(url="sqlite:///qa.db")
write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
## Initalize Reader, Retriever & Finder
# A retriever identifies the k most promising chunks of text that might contain the answer for our question
# Retrievers use some simple but fast algorithm, here: TF-IDF
retriever = TfidfRetriever(datastore=datastore)
retriever = TfidfRetriever(document_store=document_store)
# The Finder sticks together retriever and retriever in a pipeline to answer our actual questions
finder = Finder(reader, retriever)