feat: raise components inputs/outputs during execution if an Exception occurs (#9742)

* initial PoC idea running

* removing test code

* cleaning up

* wip

* cleaning up demos

* adding more pipelines to test persistence saving

* wip

* wip

* working example for logging components inputs in run time

* reverting to a simpler solution for intermediate results

* cleaning up

* testing that in a crash components outputs/inputs up to the crash point are returned

* adding tests for state persistance in a RAG pipeline

* updataing tests for state persistance in a RAG pipeline

* removing use cases of agent tests

* adding LICENSE header

* adding LICENSE header

* adding release notes

* updating tests for mocked components only

* updating release notes

* adapting PipelineRuntimeError

* cleaning up tests

* fixing test pipeline crash components inputs/outputs are saved

* fixing tests for state persistance

* isolating changes

* cleaning

* updating release notes

* addding test for regular pipeline

* small improvements and updating release notes

* cleaning imports

* removing code

* improvements/fixes based on PR comments

* raising pipeline_outputs on async version of Pipeline

* fixing async versions + updating tests

* simplifying tests

* Suggested changes pipeline crash (#9744)

* Suggested changes

* Some cleanup

* Small changes

---------

Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
This commit is contained in:
David S. Batista 2025-08-27 13:28:34 +02:00 committed by GitHub
parent 4275fed8c4
commit ac6a43f5d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 296 additions and 26 deletions

View File

@ -10,9 +10,16 @@ class PipelineError(Exception):
class PipelineRuntimeError(Exception):
def __init__(self, component_name: Optional[str], component_type: Optional[type], message: str) -> None:
def __init__(
self,
component_name: Optional[str],
component_type: Optional[type],
message: str,
pipeline_outputs: Optional[Any] = None,
) -> None:
self.component_name = component_name
self.component_type = component_type
self.pipeline_outputs = pipeline_outputs
super().__init__(message)
@classmethod

View File

@ -71,7 +71,12 @@ class AsyncPipeline(PipelineBase):
# Important: contextvars (e.g. active tracing Span) dont propagate to running loop's ThreadPoolExecutor
# We use ctx.run(...) to preserve context like the active tracing span
ctx = contextvars.copy_context()
outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs)))
try:
outputs = await loop.run_in_executor(
None, lambda: ctx.run(lambda: instance.run(**component_inputs))
)
except Exception as error:
raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
component_visits[component_name] += 1
@ -256,13 +261,19 @@ class AsyncPipeline(PipelineBase):
)
component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
component_pipeline_outputs = await self._run_component_async(
component_name=component_name,
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
parent_span=parent_span,
)
try:
component_pipeline_outputs = await self._run_component_async(
component_name=component_name,
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
parent_span=parent_span,
)
except PipelineRuntimeError as error:
# Attach partial pipeline outputs to the error before re-raising
error.pipeline_outputs = pipeline_outputs
raise error
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
pruned = self._write_component_outputs(
@ -300,14 +311,19 @@ class AsyncPipeline(PipelineBase):
component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
async def _runner():
async with ready_sem:
component_pipeline_outputs = await self._run_component_async(
component_name=component_name,
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
parent_span=parent_span,
)
try:
async with ready_sem:
component_pipeline_outputs = await self._run_component_async(
component_name=component_name,
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
parent_span=parent_span,
)
except PipelineRuntimeError as error:
# Attach partial pipeline outputs to the error before re-raising
error.pipeline_outputs = pipeline_outputs
raise error
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
pruned = self._write_component_outputs(

View File

@ -163,10 +163,12 @@ def _save_pipeline_snapshot_to_file(
if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
agent_name = pipeline_snapshot.break_point.agent_name
component_name = pipeline_snapshot.break_point.break_point.component_name
file_name = f"{agent_name}_{component_name}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{agent_name}_{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
else:
component_name = pipeline_snapshot.break_point.component_name
file_name = f"{component_name}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
try:
with open(snapshot_file_path / file_name, "w") as f_out:

View File

@ -379,13 +379,18 @@ class Pipeline(PipelineBase):
pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
)
component_outputs = self._run_component(
component_name=component_name,
component=component,
inputs=component_inputs, # the inputs to the current component
component_visits=component_visits,
parent_span=span,
)
try:
component_outputs = self._run_component(
component_name=component_name,
component=component,
inputs=component_inputs, # the inputs to the current component
component_visits=component_visits,
parent_span=span,
)
except PipelineRuntimeError as error:
# Attach partial pipeline outputs to the error before re-raising
error.pipeline_outputs = pipeline_outputs
raise error
# Updates global input state with component outputs and returns outputs that should go to
# pipeline outputs.

View File

@ -0,0 +1,6 @@
---
features:
- |
If an error occurs during the execution of a pipeline, the pipeline will raise an PipelineRuntimeError exception
containing an error message and the components outputs up to the point of failure. This allows you to inspect and
debug the pipeline up to the point of failure.

View File

@ -0,0 +1,234 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from haystack import AsyncPipeline, Document, Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.writers import DocumentWriter
from haystack.core.component import component
from haystack.core.errors import PipelineRuntimeError
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils.auth import Secret
def setup_document_store():
"""Create and populate a document store with test documents."""
documents = [
Document(content="My name is Jean and I live in Paris.", embedding=[0.1, 0.3, 0.6]),
Document(content="My name is Mark and I live in Berlin.", embedding=[0.2, 0.4, 0.7]),
Document(content="My name is Giorgio and I live in Rome.", embedding=[0.3, 0.5, 0.8]),
]
document_store = InMemoryDocumentStore()
doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
doc_writer.run(documents=documents)
return document_store
# Create a mock component that returns invalid output (int instead of documents list)
@component
class InvalidOutputEmbeddingRetriever:
@component.output_types(documents=list[Document])
def run(self, query_embedding: list[float]):
# Return an int instead of the expected documents list
# This will cause the pipeline to crash when trying to pass it to the next component
return 42
template = [
ChatMessage.from_system(
"You are a helpful AI assistant. Answer the following question based on the given context information "
"only. If the context is empty or just a '\n' answer with None, example: 'None'."
),
ChatMessage.from_user(
"""
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{question}}
"""
),
]
class TestPipelineOutputsRaisedInException:
@pytest.fixture
def mock_sentence_transformers_text_embedder(self):
with patch(
"haystack.components.embedders.sentence_transformers_text_embedder._SentenceTransformersEmbeddingBackendFactory"
) as mock_text_embedder:
mock_model = MagicMock()
mock_text_embedder.return_value = mock_model
def mock_encode(
texts, batch_size=None, show_progress_bar=None, normalize_embeddings=None, precision=None, **kwargs
): # noqa E501
return [np.ones(384).tolist() for _ in texts]
mock_model.encode = mock_encode
embedder = SentenceTransformersTextEmbedder(model="mock-model", progress_bar=False)
def mock_run(text):
if not isinstance(text, str):
raise TypeError(
"SentenceTransformersTextEmbedder expects a string as input."
"In case you want to embed a list of Documents, please use the "
"SentenceTransformersDocumentEmbedder."
)
embedding = np.ones(384).tolist()
return {"embedding": embedding}
embedder.run = mock_run
embedder.warm_up()
return embedder
def test_hybrid_rag_pipeline_crash_on_embedding_retriever(
self, mock_sentence_transformers_text_embedder, monkeypatch
):
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
document_store = setup_document_store()
text_embedder = mock_sentence_transformers_text_embedder
invalid_embedding_retriever = InvalidOutputEmbeddingRetriever()
bm25_retriever = InMemoryBM25Retriever(document_store)
document_joiner = DocumentJoiner(join_mode="concatenate")
pipeline = Pipeline()
pipeline.add_component("text_embedder", text_embedder)
pipeline.add_component("embedding_retriever", invalid_embedding_retriever)
pipeline.add_component("bm25_retriever", bm25_retriever)
pipeline.add_component("document_joiner", document_joiner)
pipeline.add_component(
"prompt_builder", ChatPromptBuilder(template=template, required_variables=["question", "documents"])
)
pipeline.add_component("llm", OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY")))
pipeline.add_component("answer_builder", AnswerBuilder())
pipeline.connect("text_embedder", "embedding_retriever")
pipeline.connect("bm25_retriever", "document_joiner")
pipeline.connect("embedding_retriever", "document_joiner")
pipeline.connect("document_joiner.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder", "llm")
pipeline.connect("llm.replies", "answer_builder.replies")
question = "Where does Mark live?"
test_data = {
"text_embedder": {"text": question},
"bm25_retriever": {"query": question},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
# run pipeline and expect it to crash due to invalid output type
with pytest.raises(PipelineRuntimeError) as exc_info:
pipeline.run(
data=test_data,
include_outputs_from={
"text_embedder",
"embedding_retriever",
"bm25_retriever",
"document_joiner",
"prompt_builder",
"llm",
"answer_builder",
},
)
pipeline_outputs = exc_info.value.pipeline_outputs
assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception"
# verify that bm25_retriever and text_embedder ran successfully before the crash
assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured"
assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents"
assert "text_embedder" in pipeline_outputs, "Text embedder output not captured"
assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings"
# components after the crash point are not in the outputs
assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash"
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"
@pytest.mark.asyncio
async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever(
self, mock_sentence_transformers_text_embedder, monkeypatch
):
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
document_store = setup_document_store()
text_embedder = mock_sentence_transformers_text_embedder
invalid_embedding_retriever = InvalidOutputEmbeddingRetriever()
bm25_retriever = InMemoryBM25Retriever(document_store)
document_joiner = DocumentJoiner(join_mode="concatenate")
pipeline = AsyncPipeline()
pipeline.add_component("text_embedder", text_embedder)
pipeline.add_component("embedding_retriever", invalid_embedding_retriever)
pipeline.add_component("bm25_retriever", bm25_retriever)
pipeline.add_component("document_joiner", document_joiner)
pipeline.add_component(
"prompt_builder", ChatPromptBuilder(template=template, required_variables=["question", "documents"])
)
pipeline.add_component("llm", OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY")))
pipeline.add_component("answer_builder", AnswerBuilder())
pipeline.connect("text_embedder", "embedding_retriever")
pipeline.connect("bm25_retriever", "document_joiner")
pipeline.connect("embedding_retriever", "document_joiner")
pipeline.connect("document_joiner.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder", "llm")
pipeline.connect("llm.replies", "answer_builder.replies")
question = "Where does Mark live?"
test_data = {
"text_embedder": {"text": question},
"bm25_retriever": {"query": question},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
with pytest.raises(PipelineRuntimeError) as exc_info:
await pipeline.run_async(
data=test_data,
include_outputs_from={
"text_embedder",
"embedding_retriever",
"bm25_retriever",
"document_joiner",
"prompt_builder",
"llm",
"answer_builder",
},
)
pipeline_outputs = exc_info.value.pipeline_outputs
assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception"
# verify that bm25_retriever and text_embedder ran successfully before the crash
assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured"
assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents"
assert "text_embedder" in pipeline_outputs, "Text embedder output not captured"
assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings"
# components after the crash point are not in the outputs
assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash"
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"