haystack/test/test_pipeline.py
tstadel fc8df2163d
Fix Windows CI OOM (#1878)
* set fixture scope to "function"

* run FARMReader without multiprocessing

* dispose off ray after tests

* run most expensive tasks first in test files

* run expensive tests first

* run garbage collector between tests

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2021-12-22 17:20:23 +01:00

397 lines
14 KiB
Python

from pathlib import Path
import os
import pytest
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.pipeline import (
JoinDocuments,
Pipeline,
FAQPipeline,
DocumentSearchPipeline,
RootNode,
SklearnQueryClassifier,
TransformersQueryClassifier,
MostSimilarDocumentsPipeline,
)
from haystack.pipelines import ExtractiveQAPipeline
from haystack.reader import FARMReader
from haystack.retriever.dense import DensePassageRetriever
from haystack.retriever.sparse import ElasticsearchRetriever
from haystack.schema import Document
@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True)
def test_load_and_save_yaml(document_store, tmp_path):
# test correct load of indexing pipeline from yaml
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline"
)
pipeline.run(
file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf"
)
# test correct load of query pipeline from yaml
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline"
)
prediction = pipeline.run(
query="Who made the PDF specification?", params={"ESRetriever": {"top_k": 10}, "Reader": {"top_k": 3}}
)
assert prediction["query"] == "Who made the PDF specification?"
assert prediction["answers"][0].answer == "Adobe Systems"
assert "_debug" not in prediction.keys()
# test invalid pipeline name
with pytest.raises(Exception):
Pipeline.load_from_yaml(
path=Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="invalid"
)
# test config export
pipeline.save_to_yaml(tmp_path / "test.yaml")
with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream:
saved_yaml = stream.read()
expected_yaml = """
components:
- name: ESRetriever
params:
document_store: ElasticsearchDocumentStore
type: ElasticsearchRetriever
- name: ElasticsearchDocumentStore
params:
index: haystack_test
label_index: haystack_test_label
type: ElasticsearchDocumentStore
- name: Reader
params:
model_name_or_path: deepset/roberta-base-squad2
no_ans_boost: -10
num_processes: 0
type: FARMReader
pipelines:
- name: query
nodes:
- inputs:
- Query
name: ESRetriever
- inputs:
- ESRetriever
name: Reader
type: Pipeline
version: '0.8'
"""
assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace(
" ", ""
).replace("\n", "")
@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True)
def test_load_and_save_yaml_prebuilt_pipelines(document_store, tmp_path):
# populating index
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline"
)
pipeline.run(
file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf"
)
# test correct load of query pipeline from yaml
pipeline = ExtractiveQAPipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline"
)
prediction = pipeline.run(
query="Who made the PDF specification?", params={"ESRetriever": {"top_k": 10}, "Reader": {"top_k": 3}}
)
assert prediction["query"] == "Who made the PDF specification?"
assert prediction["answers"][0].answer == "Adobe Systems"
assert "_debug" not in prediction.keys()
# test invalid pipeline name
with pytest.raises(Exception):
ExtractiveQAPipeline.load_from_yaml(
path=Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="invalid"
)
# test config export
pipeline.save_to_yaml(tmp_path / "test.yaml")
with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream:
saved_yaml = stream.read()
expected_yaml = """
components:
- name: ESRetriever
params:
document_store: ElasticsearchDocumentStore
type: ElasticsearchRetriever
- name: ElasticsearchDocumentStore
params:
index: haystack_test
label_index: haystack_test_label
type: ElasticsearchDocumentStore
- name: Reader
params:
model_name_or_path: deepset/roberta-base-squad2
no_ans_boost: -10
num_processes: 0
type: FARMReader
pipelines:
- name: query
nodes:
- inputs:
- Query
name: ESRetriever
- inputs:
- ESRetriever
name: Reader
type: Pipeline
version: '0.8'
"""
assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace(
" ", ""
).replace("\n", "")
def test_load_tfidfretriever_yaml(tmp_path):
documents = [
{
"content": "A Doc specifically talking about haystack. Haystack can be used to scale QA models to large document collections."
}
]
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline_tfidfretriever.yaml", pipeline_name="query_pipeline"
)
with pytest.raises(Exception) as exc_info:
pipeline.run(
query="What can be used to scale QA models to large document collections?", params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}
)
exception_raised = str(exc_info.value)
assert "Retrieval requires dataframe df and tf-idf matrix" in exception_raised
pipeline.get_node(name="Retriever").document_store.write_documents(documents=documents)
prediction = pipeline.run(
query="What can be used to scale QA models to large document collections?",
params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}
)
assert prediction["query"] == "What can be used to scale QA models to large document collections?"
assert prediction["answers"][0].answer == "haystack"
# @pytest.mark.slow
# @pytest.mark.elasticsearch
# @pytest.mark.parametrize(
# "retriever_with_docs, document_store_with_docs",
# [("elasticsearch", "elasticsearch")],
# indirect=True,
# )
@pytest.mark.parametrize(
"retriever_with_docs,document_store_with_docs",
[
("dpr", "elasticsearch"),
("dpr", "faiss"),
("dpr", "memory"),
("dpr", "milvus"),
("embedding", "elasticsearch"),
("embedding", "faiss"),
("embedding", "memory"),
("embedding", "milvus"),
("elasticsearch", "elasticsearch"),
("es_filter_only", "elasticsearch"),
("tfidf", "memory"),
],
indirect=True,
)
def test_graph_creation(retriever_with_docs, document_store_with_docs):
pipeline = Pipeline()
pipeline.add_node(name="ES", component=retriever_with_docs, inputs=["Query"])
with pytest.raises(AssertionError):
pipeline.add_node(
name="Reader", component=retriever_with_docs, inputs=["ES.output_2"]
)
with pytest.raises(AssertionError):
pipeline.add_node(
name="Reader", component=retriever_with_docs, inputs=["ES.wrong_edge_label"]
)
with pytest.raises(Exception):
pipeline.add_node(
name="Reader", component=retriever_with_docs, inputs=["InvalidNode"]
)
with pytest.raises(Exception):
pipeline = Pipeline()
pipeline.add_node(
name="ES", component=retriever_with_docs, inputs=["InvalidNode"]
)
def test_parallel_paths_in_pipeline_graph():
class A(RootNode):
def run(self):
test = "A"
return {"test": test}, "output_1"
class B(RootNode):
def run(self, test):
test += "B"
return {"test": test}, "output_1"
class C(RootNode):
def run(self, test):
test += "C"
return {"test": test}, "output_1"
class D(RootNode):
def run(self, test):
test += "D"
return {"test": test}, "output_1"
class E(RootNode):
def run(self, test):
test += "E"
return {"test": test}, "output_1"
class JoinNode(RootNode):
def run(self, inputs):
test = (
inputs[0]["test"] + inputs[1]["test"]
)
return {"test": test}, "output_1"
pipeline = Pipeline()
pipeline.add_node(name="A", component=A(), inputs=["Query"])
pipeline.add_node(name="B", component=B(), inputs=["A"])
pipeline.add_node(name="C", component=C(), inputs=["B"])
pipeline.add_node(name="E", component=E(), inputs=["C"])
pipeline.add_node(name="D", component=D(), inputs=["B"])
pipeline.add_node(name="F", component=JoinNode(), inputs=["D", "E"])
output = pipeline.run(query="test")
assert output["test"] == "ABDABCE"
pipeline = Pipeline()
pipeline.add_node(name="A", component=A(), inputs=["Query"])
pipeline.add_node(name="B", component=B(), inputs=["A"])
pipeline.add_node(name="C", component=C(), inputs=["B"])
pipeline.add_node(name="D", component=D(), inputs=["B"])
pipeline.add_node(name="E", component=JoinNode(), inputs=["C", "D"])
output = pipeline.run(query="test")
assert output["test"] == "ABCABD"
def test_parallel_paths_in_pipeline_graph_with_branching():
class AWithOutput1(RootNode):
outgoing_edges = 2
def run(self):
output = "A"
return {"output": output}, "output_1"
class AWithOutput2(RootNode):
outgoing_edges = 2
def run(self):
output = "A"
return {"output": output}, "output_2"
class AWithOutputAll(RootNode):
outgoing_edges = 2
def run(self):
output = "A"
return {"output": output}, "output_all"
class B(RootNode):
def run(self, output):
output += "B"
return {"output": output}, "output_1"
class C(RootNode):
def run(self, output):
output += "C"
return {"output": output}, "output_1"
class D(RootNode):
def run(self, output):
output += "D"
return {"output": output}, "output_1"
class E(RootNode):
def run(self, output):
output += "E"
return {"output": output}, "output_1"
class JoinNode(RootNode):
def run(self, output=None, inputs=None):
if inputs:
output = ""
for input_dict in inputs:
output += input_dict["output"]
return {"output": output}, "output_1"
pipeline = Pipeline()
pipeline.add_node(name="A", component=AWithOutput1(), inputs=["Query"])
pipeline.add_node(name="B", component=B(), inputs=["A.output_1"])
pipeline.add_node(name="C", component=C(), inputs=["A.output_2"])
pipeline.add_node(name="D", component=E(), inputs=["B"])
pipeline.add_node(name="E", component=D(), inputs=["B"])
pipeline.add_node(name="F", component=JoinNode(), inputs=["D", "E", "C"])
output = pipeline.run(query="test")
assert output["output"] == "ABEABD"
pipeline = Pipeline()
pipeline.add_node(name="A", component=AWithOutput2(), inputs=["Query"])
pipeline.add_node(name="B", component=B(), inputs=["A.output_1"])
pipeline.add_node(name="C", component=C(), inputs=["A.output_2"])
pipeline.add_node(name="D", component=E(), inputs=["B"])
pipeline.add_node(name="E", component=D(), inputs=["B"])
pipeline.add_node(name="F", component=JoinNode(), inputs=["D", "E", "C"])
output = pipeline.run(query="test")
assert output["output"] == "AC"
pipeline = Pipeline()
pipeline.add_node(name="A", component=AWithOutputAll(), inputs=["Query"])
pipeline.add_node(name="B", component=B(), inputs=["A.output_1"])
pipeline.add_node(name="C", component=C(), inputs=["A.output_2"])
pipeline.add_node(name="D", component=E(), inputs=["B"])
pipeline.add_node(name="E", component=D(), inputs=["B"])
pipeline.add_node(name="F", component=JoinNode(), inputs=["D", "E", "C"])
output = pipeline.run(query="test")
assert output["output"] == "ACABEABD"
def test_existing_faiss_document_store():
clean_faiss_document_store()
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline_faiss_indexing.yaml", pipeline_name="indexing_pipeline"
)
pipeline.run(
file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf"
)
new_document_store = pipeline.get_document_store()
new_document_store.save('existing_faiss_document_store')
# test correct load of query pipeline from yaml
pipeline = Pipeline.load_from_yaml(
Path(__file__).parent/"samples"/"pipeline"/"test_pipeline_faiss_retrieval.yaml", pipeline_name="query_pipeline"
)
retriever = pipeline.get_node("DPRRetriever")
existing_document_store = retriever.document_store
faiss_index = existing_document_store.faiss_indexes['document']
assert faiss_index.ntotal == 2
prediction = pipeline.run(
query="Who made the PDF specification?", params={"DPRRetriever": {"top_k": 10}}
)
assert prediction["query"] == "Who made the PDF specification?"
assert len(prediction["documents"]) == 2
clean_faiss_document_store()
def clean_faiss_document_store():
if Path('existing_faiss_document_store').exists():
os.remove('existing_faiss_document_store')
if Path('existing_faiss_document_store.json').exists():
os.remove('existing_faiss_document_store.json')
if Path('faiss_document_store.db').exists():
os.remove('faiss_document_store.db')