haystack/e2e/pipelines/test_evaluation_pipeline.py

288 lines
12 KiB
Python
Raw Normal View History

import os
from typing import List
import pytest
from haystack import Document, Pipeline
from haystack.components.builders import AnswerBuilder, PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.evaluators import (
ContextRelevanceEvaluator,
DocumentMAPEvaluator,
DocumentMRREvaluator,
DocumentRecallEvaluator,
FaithfulnessEvaluator,
SASEvaluator,
)
from haystack.components.evaluators.document_recall import RecallMode
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
from haystack.evaluation import EvaluationRunResult
EMBEDDINGS_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
def indexing_pipeline(documents: List[Document]):
"""Indexing the documents"""
document_store = InMemoryDocumentStore()
doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
doc_embedder = SentenceTransformersDocumentEmbedder(model=EMBEDDINGS_MODEL, progress_bar=False)
ingestion_pipe = Pipeline()
ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder") # type: ignore
ingestion_pipe.add_component(instance=doc_writer, name="doc_writer") # type: ignore
ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
ingestion_pipe.run({"doc_embedder": {"documents": documents}})
return document_store
def rag_pipeline(document_store: InMemoryDocumentStore, top_k: int): # type: ignore
"""RAG pipeline"""
template = """
You have to answer the following question based on the given context information only.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
rag = Pipeline()
rag.add_component("embedder", SentenceTransformersTextEmbedder(model=EMBEDDINGS_MODEL, progress_bar=False)) # type: ignore
rag.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=top_k)) # type: ignore
rag.add_component("prompt_builder", PromptBuilder(template=template)) # type: ignore
rag.add_component("generator", OpenAIGenerator(model="gpt-3.5-turbo")) # type: ignore
rag.add_component("answer_builder", AnswerBuilder()) # type: ignore
rag.connect("embedder", "retriever.query_embedding")
rag.connect("retriever", "prompt_builder.documents")
rag.connect("prompt_builder", "generator")
rag.connect("generator.replies", "answer_builder.replies")
rag.connect("generator.meta", "answer_builder.meta")
rag.connect("retriever", "answer_builder.documents")
return rag
def evaluation_pipeline():
"""
Create an evaluation pipeline with the following evaluators:
- DocumentMRREvaluator
- FaithfulnessEvaluator
- SASEvaluator
- DocumentMAPEvaluator
- DocumentRecallEvaluator
- ContextRelevanceEvaluator
"""
eval_pipeline = Pipeline()
eval_pipeline.add_component("doc_mrr", DocumentMRREvaluator())
eval_pipeline.add_component("groundedness", FaithfulnessEvaluator())
eval_pipeline.add_component("sas", SASEvaluator(model=EMBEDDINGS_MODEL))
eval_pipeline.add_component("doc_map", DocumentMAPEvaluator())
eval_pipeline.add_component("doc_recall_single_hit", DocumentRecallEvaluator(mode=RecallMode.SINGLE_HIT))
eval_pipeline.add_component("doc_recall_multi_hit", DocumentRecallEvaluator(mode=RecallMode.MULTI_HIT))
eval_pipeline.add_component("relevance", ContextRelevanceEvaluator())
return eval_pipeline
def built_eval_input(questions, truth_docs, truth_answers, retrieved_docs, contexts, pred_answers):
"""Helper function to build the input for the evaluation pipeline"""
return {
"doc_mrr": {"ground_truth_documents": truth_docs, "retrieved_documents": retrieved_docs},
"groundedness": {"questions": questions, "contexts": contexts, "predicted_answers": pred_answers},
"sas": {"predicted_answers": pred_answers, "ground_truth_answers": truth_answers},
"doc_map": {"ground_truth_documents": truth_docs, "retrieved_documents": retrieved_docs},
"doc_recall_single_hit": {"ground_truth_documents": truth_docs, "retrieved_documents": retrieved_docs},
"doc_recall_multi_hit": {"ground_truth_documents": truth_docs, "retrieved_documents": retrieved_docs},
"relevance": {"questions": questions, "contexts": contexts},
}
def run_rag_pipeline(documents, evaluation_questions, rag_pipeline_a):
"""
Run the RAG pipeline and return the contexts, predicted answers, retrieved documents and ground truth documents
"""
truth_docs = []
retrieved_docs = []
contexts = []
predicted_answers = []
for q in evaluation_questions:
response = rag_pipeline_a.run(
{
"embedder": {"text": q["question"]},
"prompt_builder": {"question": q["question"]},
"answer_builder": {"query": q["question"]},
}
)
truth_docs.append([doc for doc in documents if doc.meta["name"] in q["ground_truth_doc"] and doc.content])
retrieved_docs.append(response["answer_builder"]["answers"][0].documents)
contexts.append([doc.content for doc in response["answer_builder"]["answers"][0].documents])
predicted_answers.append(response["answer_builder"]["answers"][0].data)
return contexts, predicted_answers, retrieved_docs, truth_docs
def built_input_for_results_eval(rag_results):
"""Helper function to build the input for the results evaluation"""
return {
"Mean Reciprocal Rank": {
"individual_scores": rag_results["doc_mrr"]["individual_scores"],
"score": rag_results["doc_mrr"]["score"],
},
"Semantic Answer Similarity": {
"individual_scores": rag_results["sas"]["individual_scores"],
"score": rag_results["sas"]["score"],
},
"Faithfulness": {
"individual_scores": rag_results["groundedness"]["individual_scores"],
"score": rag_results["groundedness"]["score"],
},
"Document MAP": {
"individual_scores": rag_results["doc_map"]["individual_scores"],
"score": rag_results["doc_map"]["score"],
},
"Document Recall Single Hit": {
"individual_scores": rag_results["doc_recall_single_hit"]["individual_scores"],
"score": rag_results["doc_recall_single_hit"]["score"],
},
"Document Recall Multi Hit": {
"individual_scores": rag_results["doc_recall_multi_hit"]["individual_scores"],
"score": rag_results["doc_recall_multi_hit"]["score"],
},
"Contextual Relevance": {
"individual_scores": rag_results["relevance"]["individual_scores"],
"score": rag_results["relevance"]["score"],
},
}
@pytest.mark.skipif(
not os.environ.get("OPENAI_API_KEY", None),
reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.",
)
def test_evaluation_pipeline(samples_path):
"""Test an evaluation pipeline"""
eval_questions = [
{
"question": 'What falls within the term "cultural anthropology"?',
"answer": "the ideology and analytical stance of cultural relativism",
"ground_truth_doc": ["Culture.txt"],
},
{
"question": "Who was the spiritual guide during the Protestant Reformation?",
"answer": "Martin Bucer",
"ground_truth_doc": ["Strasbourg.txt"],
},
{
"question": "What is materialism?",
"answer": "a form of philosophical monism",
"ground_truth_doc": ["Materialism.txt"],
},
]
questions = [q["question"] for q in eval_questions]
truth_answers = [q["answer"] for q in eval_questions]
# indexing documents
docs = []
full_path = os.path.join(str(samples_path) + "/test_documents/")
for article in os.listdir(full_path):
with open(f"{full_path}/{article}", "r") as f:
for text in f.read().split("\n"):
docs.append(Document(content=text, meta={"name": article})) if text else None
doc_store = indexing_pipeline(docs)
# running the RAG pipeline A + evaluation pipeline
rag_pipeline_a = rag_pipeline(doc_store, top_k=2)
contexts_a, pred_answers_a, retrieved_docs_a, truth_docs = run_rag_pipeline(docs, eval_questions, rag_pipeline_a)
eval_pipeline = evaluation_pipeline()
eval_input = built_eval_input(questions, truth_docs, truth_answers, retrieved_docs_a, contexts_a, pred_answers_a)
results_rag_a = eval_pipeline.run(eval_input)
# running the evaluation EvaluationRunResult
inputs_a = {
"question": questions,
"contexts": contexts_a,
"answer": truth_answers,
"predicted_answer": pred_answers_a,
}
results_a = built_input_for_results_eval(results_rag_a)
evaluation_result_a = EvaluationRunResult(run_name="rag_pipeline_a", results=results_a, inputs=inputs_a)
df_score_report = evaluation_result_a.score_report()
# assert the score report has all the metrics
assert len(df_score_report) == 7
assert list(df_score_report.columns) == ["score"]
assert list(df_score_report.index) == [
"Mean Reciprocal Rank",
"Semantic Answer Similarity",
"Faithfulness",
"Document MAP",
"Document Recall Single Hit",
"Document Recall Multi Hit",
"Contextual Relevance",
]
# assert the evaluation result has all the metrics, inputs and questions
df = evaluation_result_a.to_pandas()
assert list(df.columns) == [
"question",
"contexts",
"answer",
"predicted_answer",
"Mean Reciprocal Rank",
"Semantic Answer Similarity",
"Faithfulness",
"Document MAP",
"Document Recall Single Hit",
"Document Recall Multi Hit",
"Contextual Relevance",
]
assert len(df) == 3
# running the RAG pipeline B
rag_pipeline_b = rag_pipeline(doc_store, top_k=4)
contexts_b, pred_answers_b, retrieved_docs_b, truth_docs = run_rag_pipeline(docs, eval_questions, rag_pipeline_b)
eval_input = built_eval_input(questions, truth_docs, truth_answers, retrieved_docs_b, contexts_b, pred_answers_b)
results_rag_b = eval_pipeline.run(eval_input)
inputs_b = {
"question": questions,
"contexts": contexts_b,
"answer": truth_answers,
"predicted_answer": pred_answers_b,
}
results_b = built_input_for_results_eval(results_rag_b)
evaluation_result_b = EvaluationRunResult(run_name="rag_pipeline_b", results=results_b, inputs=inputs_b)
df_comparative = evaluation_result_a.comparative_individual_scores_report(evaluation_result_b)
# assert the comparative score report has all the metrics, inputs and questions
assert len(df_comparative) == 3
assert list(df_comparative.columns) == [
"question",
"contexts",
"answer",
"predicted_answer",
"rag_pipeline_a_Mean Reciprocal Rank",
"rag_pipeline_a_Semantic Answer Similarity",
"rag_pipeline_a_Faithfulness",
"rag_pipeline_a_Document MAP",
"rag_pipeline_a_Document Recall Single Hit",
"rag_pipeline_a_Document Recall Multi Hit",
"rag_pipeline_a_Contextual Relevance",
"rag_pipeline_b_Mean Reciprocal Rank",
"rag_pipeline_b_Semantic Answer Similarity",
"rag_pipeline_b_Faithfulness",
"rag_pipeline_b_Document MAP",
"rag_pipeline_b_Document Recall Single Hit",
"rag_pipeline_b_Document Recall Multi Hit",
"rag_pipeline_b_Contextual Relevance",
]