haystack/test/benchmarks/retriever.py
Malte Pietsch 4a6c9302b3
Redesign primitives - Document, Answer, Label (#1398)
* first draft / notes on new primitives

* wip label / feedback refactor

* rename doc.text -> doc.content. add doc.content_type

* add datatype for content

* remove faq_question_field from ES and weaviate. rename text_field -> content_field in docstores. update tutorials for content field

* update converters for . Add warning for empty

* renam label.question -> label.query. Allow sorting of Answers.

* WIP primitives

* update ui/reader for new Answer format

* Improve Label. First refactoring of MultiLabel. Adjust eval code

* fixed workflow conflict with introducing new one (#1472)

* Add latest docstring and tutorial changes

* make add_eval_data() work again

* fix reader formats. WIP fix _extract_docs_and_labels_from_dict

* fix test reader

* Add latest docstring and tutorial changes

* fix another test case for reader

* fix mypy in farm reader.eval()

* fix mypy in farm reader.eval()

* WIP ORM refactor

* Add latest docstring and tutorial changes

* fix mypy weaviate

* make label and multilabel dataclasses

* bump mypy env in CI to python 3.8

* WIP refactor Label ORM

* WIP refactor Label ORM

* simplify tests for individual doc stores

* WIP refactoring markers of tests

* test alternative approach for tests with existing parametrization

* WIP refactor ORMs

* fix skip logic of already parametrized tests

* fix weaviate behaviour in tests - not parametrizing it in our general test cases.

* Add latest docstring and tutorial changes

* fix some tests

* remove sql from document_store_types

* fix markers for generator and pipeline test

* remove inmemory marker

* remove unneeded elasticsearch markers

* add dataclasses-json dependency. adjust ORM to just store JSON repr

* ignore type as dataclasses_json seems to miss functionality here

* update readme and contributing.md

* update contributing

* adjust example

* fix duplicate doc handling for custom index

* Add latest docstring and tutorial changes

* fix some ORM issues. fix get_all_labels_aggregated.

* update drop flags where get_all_labels_aggregated() was used before

* Add latest docstring and tutorial changes

* add to_json(). add + fix tests

* fix no_answer handling in label / multilabel

* fix duplicate docs in memory doc store. change primary key for sql doc table

* fix mypy issues

* fix mypy issues

* haystack/retriever/base.py

* fix test_write_document_meta[elastic]

* fix test_elasticsearch_custom_fields

* fix test_labels[elastic]

* fix crawler

* fix converter

* fix docx converter

* fix preprocessor

* fix test_utils

* fix tfidf retriever. fix selection of docstore in tests with multiple fixtures / parameterizations

* Add latest docstring and tutorial changes

* fix crawler test. fix ocrconverter attribute

* fix test_elasticsearch_custom_query

* fix generator pipeline

* fix ocr converter

* fix ragenerator

* Add latest docstring and tutorial changes

* fix test_load_and_save_yaml for elasticsearch

* fixes for pipeline tests

* fix faq pipeline

* fix pipeline tests

* Add latest docstring and tutorial changes

* fix weaviate

* Add latest docstring and tutorial changes

* trigger CI

* satisfy mypy

* Add latest docstring and tutorial changes

* satisfy mypy

* Add latest docstring and tutorial changes

* trigger CI

* fix question generation test

* fix ray. fix Q-generation

* fix translator test

* satisfy mypy

* wip refactor feedback rest api

* fix rest api feedback endpoint

* fix doc classifier

* remove relation of Labels -> Docs in SQL ORM

* fix faiss/milvus tests

* fix doc classifier test

* fix eval test

* fixing eval issues

* Add latest docstring and tutorial changes

* fix mypy

* WIP replace dataclasses-json with manual serialization

* Add latest docstring and tutorial changes

* revert to dataclass-json serialization for now. remove debug prints.

* update docstrings

* fix extractor. fix Answer Span init

* fix api test

* keep meta data of answers in reader.run()

* fix meta handling

* adress review feedback

* Add latest docstring and tutorial changes

* make document=None for open domain labels

* add import

* fix print utils

* fix rest api

* adress review feedback

* Add latest docstring and tutorial changes

* fix mypy

Co-authored-by: Markus Paff <markuspaff.mp@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2021-10-13 14:23:23 +02:00

327 lines
14 KiB
Python

import pandas as pd
from pathlib import Path
from time import perf_counter
from utils import get_document_store, get_retriever, index_to_doc_store, load_config, download_from_url
from haystack.preprocessor.utils import eval_data_from_json
from haystack.document_store.faiss import FAISSDocumentStore
from haystack import Document
import pickle
import time
from tqdm import tqdm
import logging
import datetime
import random
import traceback
import os
import requests
import json
from results_to_json import retriever as retriever_json
from templates import RETRIEVER_TEMPLATE, RETRIEVER_MAP_TEMPLATE, RETRIEVER_SPEED_TEMPLATE
from haystack.utils import stop_service
logger = logging.getLogger(__name__)
logging.getLogger("haystack.retriever.base").setLevel(logging.WARN)
logging.getLogger("elasticsearch").setLevel(logging.WARN)
doc_index = "eval_document"
label_index = "label"
index_results_file = "retriever_index_results.csv"
query_results_file = "retriever_query_results.csv"
overview_json = "../../docs/_src/benchmarks/retriever_performance.json"
map_json = "../../docs/_src/benchmarks/retriever_map.json"
speed_json = "../../docs/_src/benchmarks/retriever_speed.json"
DEVICES = None
seed = 42
random.seed(42)
def benchmark_indexing(n_docs_options, retriever_doc_stores, data_dir, filename_gold, filename_negative, data_s3_url, embeddings_filenames, embeddings_dir, update_json, save_markdown, **kwargs):
retriever_results = []
for n_docs in n_docs_options:
for retriever_name, doc_store_name in retriever_doc_stores:
logger.info(f"##### Start indexing run: {retriever_name}, {doc_store_name}, {n_docs} docs ##### ")
try:
doc_store = get_document_store(doc_store_name)
retriever = get_retriever(retriever_name, doc_store, DEVICES)
docs, _ = prepare_data(data_dir=data_dir,
filename_gold=filename_gold,
filename_negative=filename_negative,
remote_url=data_s3_url,
embeddings_filenames=embeddings_filenames,
embeddings_dir=embeddings_dir,
n_docs=n_docs)
tic = perf_counter()
index_to_doc_store(doc_store, docs, retriever)
toc = perf_counter()
indexing_time = toc - tic
print(indexing_time)
retriever_results.append({
"retriever": retriever_name,
"doc_store": doc_store_name,
"n_docs": n_docs,
"indexing_time": indexing_time,
"docs_per_second": n_docs / indexing_time,
"date_time": datetime.datetime.now(),
"error": None})
retriever_df = pd.DataFrame.from_records(retriever_results)
retriever_df = retriever_df.sort_values(by="retriever").sort_values(by="doc_store")
retriever_df.to_csv(index_results_file)
logger.info("Deleting all docs from this run ...")
if isinstance(doc_store, FAISSDocumentStore):
doc_store.session.close()
else:
doc_store.delete_all_documents(index=doc_index)
doc_store.delete_all_documents(index=label_index)
if save_markdown:
md_file = index_results_file.replace(".csv", ".md")
with open(md_file, "w") as f:
f.write(str(retriever_df.to_markdown()))
time.sleep(10)
stop_service(doc_store)
del doc_store
del retriever
except Exception:
tb = traceback.format_exc()
logging.error(f"##### The following Error was raised while running indexing run: {retriever_name}, {doc_store_name}, {n_docs} docs #####")
logging.error(tb)
retriever_results.append({
"retriever": retriever_name,
"doc_store": doc_store_name,
"n_docs": n_docs,
"indexing_time": 0,
"docs_per_second": 0,
"date_time": datetime.datetime.now(),
"error": str(tb)})
logger.info("Deleting all docs from this run ...")
if isinstance(doc_store, FAISSDocumentStore):
doc_store.session.close()
else:
doc_store.delete_all_documents(index=doc_index)
doc_store.delete_all_documents(index=label_index)
time.sleep(10)
stop_service(doc_store)
del doc_store
del retriever
if update_json:
populate_retriever_json()
def benchmark_querying(n_docs_options,
retriever_doc_stores,
data_dir,
data_s3_url,
filename_gold,
filename_negative,
n_queries,
embeddings_filenames,
embeddings_dir,
update_json,
save_markdown,
wait_write_limit=100,
**kwargs):
""" Benchmark the time it takes to perform querying. Doc embeddings are loaded from file."""
retriever_results = []
for n_docs in n_docs_options:
for retriever_name, doc_store_name in retriever_doc_stores:
try:
logger.info(f"##### Start querying run: {retriever_name}, {doc_store_name}, {n_docs} docs ##### ")
if retriever_name in ["elastic", "sentence_transformers"]:
similarity = "cosine"
else:
similarity = "dot_product"
doc_store = get_document_store(doc_store_name, similarity=similarity)
retriever = get_retriever(retriever_name, doc_store, DEVICES)
add_precomputed = retriever_name in ["dpr"]
# For DPR, precomputed embeddings are loaded from file
docs, labels = prepare_data(data_dir=data_dir,
filename_gold=filename_gold,
filename_negative=filename_negative,
remote_url=data_s3_url,
embeddings_filenames=embeddings_filenames,
embeddings_dir=embeddings_dir,
n_docs=n_docs,
n_queries=n_queries,
add_precomputed=add_precomputed)
logger.info("Start indexing...")
index_to_doc_store(doc_store, docs, retriever, labels)
logger.info("Start queries...")
raw_results = retriever.eval()
results = {
"retriever": retriever_name,
"doc_store": doc_store_name,
"n_docs": n_docs,
"n_queries": raw_results["n_questions"],
"retrieve_time": raw_results["retrieve_time"],
"queries_per_second": raw_results["n_questions"] / raw_results["retrieve_time"],
"seconds_per_query": raw_results["retrieve_time"]/ raw_results["n_questions"],
"recall": raw_results["recall"] * 100,
"map": raw_results["map"] * 100,
"top_k": raw_results["top_k"],
"date_time": datetime.datetime.now(),
"error": None
}
logger.info("Deleting all docs from this run ...")
if isinstance(doc_store, FAISSDocumentStore):
doc_store.session.close()
else:
doc_store.delete_all_documents(index=doc_index)
doc_store.delete_all_documents(index=label_index)
time.sleep(5)
stop_service(doc_store)
del doc_store
del retriever
except Exception:
tb = traceback.format_exc()
logging.error(f"##### The following Error was raised while running querying run: {retriever_name}, {doc_store_name}, {n_docs} docs #####")
logging.error(tb)
results = {
"retriever": retriever_name,
"doc_store": doc_store_name,
"n_docs": n_docs,
"n_queries": 0,
"retrieve_time": 0.,
"queries_per_second": 0.,
"seconds_per_query": 0.,
"recall": 0.,
"map": 0.,
"top_k": 0,
"date_time": datetime.datetime.now(),
"error": str(tb)
}
logger.info("Deleting all docs from this run ...")
if isinstance(doc_store, FAISSDocumentStore):
doc_store.session.close()
else:
doc_store.delete_all_documents(index=doc_index)
doc_store.delete_all_documents(index=label_index)
time.sleep(5)
del doc_store
del retriever
logger.info(results)
retriever_results.append(results)
retriever_df = pd.DataFrame.from_records(retriever_results)
retriever_df = retriever_df.sort_values(by="retriever").sort_values(by="doc_store")
retriever_df.to_csv(query_results_file)
if save_markdown:
md_file = query_results_file.replace(".csv", ".md")
with open(md_file, "w") as f:
f.write(str(retriever_df.to_markdown()))
if update_json:
populate_retriever_json()
def populate_retriever_json():
retriever_overview_data, retriever_map_data, retriever_speed_data = retriever_json(index_csv=index_results_file,
query_csv=query_results_file)
overview = RETRIEVER_TEMPLATE
overview["data"] = retriever_overview_data
map = RETRIEVER_MAP_TEMPLATE
map["data"] = retriever_map_data
speed = RETRIEVER_SPEED_TEMPLATE
speed["data"] = retriever_speed_data
json.dump(overview, open(overview_json, "w"), indent=4)
json.dump(speed, open(speed_json, "w"), indent=4)
json.dump(map, open(map_json, "w"), indent=4)
def add_precomputed_embeddings(embeddings_dir, embeddings_filenames, docs):
ret = []
id_to_doc = {x.meta["passage_id"]: x for x in docs}
for ef in embeddings_filenames:
logger.info(f"Adding precomputed embeddings from {embeddings_dir + ef}")
filename = embeddings_dir + ef
embeds = pickle.load(open(filename, "rb"))
for i, vec in embeds:
if int(i) in id_to_doc:
curr = id_to_doc[int(i)]
curr.embedding = vec
ret.append(curr)
# In the official DPR repo, there are only 20594995 precomputed embeddings for 21015324 wikipedia passages
# If there isn't an embedding for a given doc, we remove it here
ret = [x for x in ret if x.embedding is not None]
logger.info(f"Embeddings loaded for {len(ret)}/{len(docs)} docs")
return ret
def prepare_data(data_dir, filename_gold, filename_negative, remote_url, embeddings_filenames, embeddings_dir, n_docs=None, n_queries=None, add_precomputed=False):
"""
filename_gold points to a squad format file.
filename_negative points to a csv file where the first column is doc_id and second is document text.
If add_precomputed is True, this fn will look in the embeddings files for precomputed embeddings to add to each Document
"""
logging.getLogger("farm").setLevel(logging.INFO)
download_from_url(remote_url + filename_gold, filepath=data_dir + filename_gold)
download_from_url(remote_url + filename_negative, filepath=data_dir + filename_negative)
if add_precomputed:
for embedding_filename in embeddings_filenames:
download_from_url(remote_url + str(embeddings_dir) + embedding_filename, filepath=data_dir + str(embeddings_dir) + embedding_filename)
logging.getLogger("farm").setLevel(logging.WARN)
gold_docs, labels = eval_data_from_json(data_dir + filename_gold)
# Reduce number of docs
gold_docs = gold_docs[:n_docs]
# Remove labels whose gold docs have been removed
doc_ids = [x.id for x in gold_docs]
labels = [x for x in labels if x.document_id in doc_ids]
# Filter labels down to n_queries
selected_queries = list(set(f"{x.document_id} | {x.query}" for x in labels))
selected_queries = selected_queries[:n_queries]
labels = [x for x in labels if f"{x.document_id} | {x.query}" in selected_queries]
n_neg_docs = max(0, n_docs - len(gold_docs))
neg_docs = prepare_negative_passages(data_dir, filename_negative, n_neg_docs)
docs = gold_docs + neg_docs
if add_precomputed:
docs = add_precomputed_embeddings(data_dir + embeddings_dir, embeddings_filenames, docs)
return docs, labels
def prepare_negative_passages(data_dir, filename_negative, n_docs):
if n_docs == 0:
return []
with open(data_dir + filename_negative) as f:
lines = []
_ = f.readline() # Skip column titles line
for _ in range(n_docs):
lines.append(f.readline()[:-1])
docs = []
for l in lines[:n_docs]:
id, text, title = l.split("\t")
d = {"text": text,
"meta": {"passage_id": int(id),
"title": title}}
d = Document(**d)
docs.append(d)
return docs
if __name__ == "__main__":
params, filenames = load_config(config_filename="config.json", ci=True)
benchmark_indexing(**params, **filenames, update_json=True, save_markdown=False)
benchmark_querying(**params, **filenames, update_json=True, save_markdown=False)