diff --git a/haystack/core/errors.py b/haystack/core/errors.py index 38cb2ce1a..3d750067d 100644 --- a/haystack/core/errors.py +++ b/haystack/core/errors.py @@ -10,9 +10,16 @@ class PipelineError(Exception): class PipelineRuntimeError(Exception): - def __init__(self, component_name: Optional[str], component_type: Optional[type], message: str) -> None: + def __init__( + self, + component_name: Optional[str], + component_type: Optional[type], + message: str, + pipeline_outputs: Optional[Any] = None, + ) -> None: self.component_name = component_name self.component_type = component_type + self.pipeline_outputs = pipeline_outputs super().__init__(message) @classmethod diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index dc5553b6d..70487e6a8 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -71,7 +71,12 @@ class AsyncPipeline(PipelineBase): # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor # We use ctx.run(...) to preserve context like the active tracing span ctx = contextvars.copy_context() - outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs))) + try: + outputs = await loop.run_in_executor( + None, lambda: ctx.run(lambda: instance.run(**component_inputs)) + ) + except Exception as error: + raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error component_visits[component_name] += 1 @@ -256,13 +261,19 @@ class AsyncPipeline(PipelineBase): ) component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state) component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"]) - component_pipeline_outputs = await self._run_component_async( - component_name=component_name, - component=comp_dict, - component_inputs=component_inputs, - component_visits=component_visits, - parent_span=parent_span, - ) + + try: + component_pipeline_outputs = await self._run_component_async( + component_name=component_name, + component=comp_dict, + component_inputs=component_inputs, + component_visits=component_visits, + parent_span=parent_span, + ) + except PipelineRuntimeError as error: + # Attach partial pipeline outputs to the error before re-raising + error.pipeline_outputs = pipeline_outputs + raise error # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from` pruned = self._write_component_outputs( @@ -300,14 +311,19 @@ class AsyncPipeline(PipelineBase): component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"]) async def _runner(): - async with ready_sem: - component_pipeline_outputs = await self._run_component_async( - component_name=component_name, - component=comp_dict, - component_inputs=component_inputs, - component_visits=component_visits, - parent_span=parent_span, - ) + try: + async with ready_sem: + component_pipeline_outputs = await self._run_component_async( + component_name=component_name, + component=comp_dict, + component_inputs=component_inputs, + component_visits=component_visits, + parent_span=parent_span, + ) + except PipelineRuntimeError as error: + # Attach partial pipeline outputs to the error before re-raising + error.pipeline_outputs = pipeline_outputs + raise error # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from` pruned = self._write_component_outputs( diff --git a/haystack/core/pipeline/breakpoint.py b/haystack/core/pipeline/breakpoint.py index 22a604dae..410ccfcc5 100644 --- a/haystack/core/pipeline/breakpoint.py +++ b/haystack/core/pipeline/breakpoint.py @@ -163,10 +163,12 @@ def _save_pipeline_snapshot_to_file( if isinstance(pipeline_snapshot.break_point, AgentBreakpoint): agent_name = pipeline_snapshot.break_point.agent_name component_name = pipeline_snapshot.break_point.break_point.component_name - file_name = f"{agent_name}_{component_name}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json" + visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0) + file_name = f"{agent_name}_{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json" else: component_name = pipeline_snapshot.break_point.component_name - file_name = f"{component_name}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json" + visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0) + file_name = f"{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json" try: with open(snapshot_file_path / file_name, "w") as f_out: diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index d5301d2dd..0c6d97248 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -379,13 +379,18 @@ class Pipeline(PipelineBase): pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs ) - component_outputs = self._run_component( - component_name=component_name, - component=component, - inputs=component_inputs, # the inputs to the current component - component_visits=component_visits, - parent_span=span, - ) + try: + component_outputs = self._run_component( + component_name=component_name, + component=component, + inputs=component_inputs, # the inputs to the current component + component_visits=component_visits, + parent_span=span, + ) + except PipelineRuntimeError as error: + # Attach partial pipeline outputs to the error before re-raising + error.pipeline_outputs = pipeline_outputs + raise error # Updates global input state with component outputs and returns outputs that should go to # pipeline outputs. diff --git a/releasenotes/notes/handle-pipeline-crashes-and-resume-state-21c0e46a8935cbe2.yaml b/releasenotes/notes/handle-pipeline-crashes-and-resume-state-21c0e46a8935cbe2.yaml new file mode 100644 index 000000000..6dd9ba84d --- /dev/null +++ b/releasenotes/notes/handle-pipeline-crashes-and-resume-state-21c0e46a8935cbe2.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + If an error occurs during the execution of a pipeline, the pipeline will raise an PipelineRuntimeError exception + containing an error message and the components outputs up to the point of failure. This allows you to inspect and + debug the pipeline up to the point of failure. diff --git a/test/core/pipeline/test_pipeline_crash_regular_pipeline_outputs_raised.py b/test/core/pipeline/test_pipeline_crash_regular_pipeline_outputs_raised.py new file mode 100644 index 000000000..688b55046 --- /dev/null +++ b/test/core/pipeline/test_pipeline_crash_regular_pipeline_outputs_raised.py @@ -0,0 +1,234 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from haystack import AsyncPipeline, Document, Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.components.builders.answer_builder import AnswerBuilder +from haystack.components.embedders import SentenceTransformersTextEmbedder +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.components.joiners import DocumentJoiner +from haystack.components.retrievers.in_memory import InMemoryBM25Retriever +from haystack.components.writers import DocumentWriter +from haystack.core.component import component +from haystack.core.errors import PipelineRuntimeError +from haystack.dataclasses import ChatMessage +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.document_stores.types import DuplicatePolicy +from haystack.utils.auth import Secret + + +def setup_document_store(): + """Create and populate a document store with test documents.""" + documents = [ + Document(content="My name is Jean and I live in Paris.", embedding=[0.1, 0.3, 0.6]), + Document(content="My name is Mark and I live in Berlin.", embedding=[0.2, 0.4, 0.7]), + Document(content="My name is Giorgio and I live in Rome.", embedding=[0.3, 0.5, 0.8]), + ] + + document_store = InMemoryDocumentStore() + doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP) + doc_writer.run(documents=documents) + + return document_store + + +# Create a mock component that returns invalid output (int instead of documents list) +@component +class InvalidOutputEmbeddingRetriever: + @component.output_types(documents=list[Document]) + def run(self, query_embedding: list[float]): + # Return an int instead of the expected documents list + # This will cause the pipeline to crash when trying to pass it to the next component + return 42 + + +template = [ + ChatMessage.from_system( + "You are a helpful AI assistant. Answer the following question based on the given context information " + "only. If the context is empty or just a '\n' answer with None, example: 'None'." + ), + ChatMessage.from_user( + """ + Context: + {% for document in documents %} + {{ document.content }} + {% endfor %} + + Question: {{question}} + """ + ), +] + + +class TestPipelineOutputsRaisedInException: + @pytest.fixture + def mock_sentence_transformers_text_embedder(self): + with patch( + "haystack.components.embedders.sentence_transformers_text_embedder._SentenceTransformersEmbeddingBackendFactory" + ) as mock_text_embedder: + mock_model = MagicMock() + mock_text_embedder.return_value = mock_model + + def mock_encode( + texts, batch_size=None, show_progress_bar=None, normalize_embeddings=None, precision=None, **kwargs + ): # noqa E501 + return [np.ones(384).tolist() for _ in texts] + + mock_model.encode = mock_encode + embedder = SentenceTransformersTextEmbedder(model="mock-model", progress_bar=False) + + def mock_run(text): + if not isinstance(text, str): + raise TypeError( + "SentenceTransformersTextEmbedder expects a string as input." + "In case you want to embed a list of Documents, please use the " + "SentenceTransformersDocumentEmbedder." + ) + + embedding = np.ones(384).tolist() + return {"embedding": embedding} + + embedder.run = mock_run + embedder.warm_up() + return embedder + + def test_hybrid_rag_pipeline_crash_on_embedding_retriever( + self, mock_sentence_transformers_text_embedder, monkeypatch + ): + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + + document_store = setup_document_store() + text_embedder = mock_sentence_transformers_text_embedder + invalid_embedding_retriever = InvalidOutputEmbeddingRetriever() + bm25_retriever = InMemoryBM25Retriever(document_store) + document_joiner = DocumentJoiner(join_mode="concatenate") + + pipeline = Pipeline() + pipeline.add_component("text_embedder", text_embedder) + pipeline.add_component("embedding_retriever", invalid_embedding_retriever) + pipeline.add_component("bm25_retriever", bm25_retriever) + pipeline.add_component("document_joiner", document_joiner) + pipeline.add_component( + "prompt_builder", ChatPromptBuilder(template=template, required_variables=["question", "documents"]) + ) + pipeline.add_component("llm", OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"))) + pipeline.add_component("answer_builder", AnswerBuilder()) + + pipeline.connect("text_embedder", "embedding_retriever") + pipeline.connect("bm25_retriever", "document_joiner") + pipeline.connect("embedding_retriever", "document_joiner") + pipeline.connect("document_joiner.documents", "prompt_builder.documents") + pipeline.connect("prompt_builder", "llm") + pipeline.connect("llm.replies", "answer_builder.replies") + + question = "Where does Mark live?" + test_data = { + "text_embedder": {"text": question}, + "bm25_retriever": {"query": question}, + "prompt_builder": {"question": question}, + "answer_builder": {"query": question}, + } + + # run pipeline and expect it to crash due to invalid output type + with pytest.raises(PipelineRuntimeError) as exc_info: + pipeline.run( + data=test_data, + include_outputs_from={ + "text_embedder", + "embedding_retriever", + "bm25_retriever", + "document_joiner", + "prompt_builder", + "llm", + "answer_builder", + }, + ) + + pipeline_outputs = exc_info.value.pipeline_outputs + + assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception" + + # verify that bm25_retriever and text_embedder ran successfully before the crash + assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured" + assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents" + assert "text_embedder" in pipeline_outputs, "Text embedder output not captured" + assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings" + + # components after the crash point are not in the outputs + assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash" + assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash" + assert "llm" not in pipeline_outputs, "LLM should not have run due to crash" + assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash" + + @pytest.mark.asyncio + async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever( + self, mock_sentence_transformers_text_embedder, monkeypatch + ): + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + + document_store = setup_document_store() + text_embedder = mock_sentence_transformers_text_embedder + invalid_embedding_retriever = InvalidOutputEmbeddingRetriever() + bm25_retriever = InMemoryBM25Retriever(document_store) + document_joiner = DocumentJoiner(join_mode="concatenate") + + pipeline = AsyncPipeline() + pipeline.add_component("text_embedder", text_embedder) + pipeline.add_component("embedding_retriever", invalid_embedding_retriever) + pipeline.add_component("bm25_retriever", bm25_retriever) + pipeline.add_component("document_joiner", document_joiner) + pipeline.add_component( + "prompt_builder", ChatPromptBuilder(template=template, required_variables=["question", "documents"]) + ) + pipeline.add_component("llm", OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"))) + pipeline.add_component("answer_builder", AnswerBuilder()) + + pipeline.connect("text_embedder", "embedding_retriever") + pipeline.connect("bm25_retriever", "document_joiner") + pipeline.connect("embedding_retriever", "document_joiner") + pipeline.connect("document_joiner.documents", "prompt_builder.documents") + pipeline.connect("prompt_builder", "llm") + pipeline.connect("llm.replies", "answer_builder.replies") + + question = "Where does Mark live?" + test_data = { + "text_embedder": {"text": question}, + "bm25_retriever": {"query": question}, + "prompt_builder": {"question": question}, + "answer_builder": {"query": question}, + } + + with pytest.raises(PipelineRuntimeError) as exc_info: + await pipeline.run_async( + data=test_data, + include_outputs_from={ + "text_embedder", + "embedding_retriever", + "bm25_retriever", + "document_joiner", + "prompt_builder", + "llm", + "answer_builder", + }, + ) + + pipeline_outputs = exc_info.value.pipeline_outputs + assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception" + + # verify that bm25_retriever and text_embedder ran successfully before the crash + assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured" + assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents" + assert "text_embedder" in pipeline_outputs, "Text embedder output not captured" + assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings" + + # components after the crash point are not in the outputs + assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash" + assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash" + assert "llm" not in pipeline_outputs, "LLM should not have run due to crash" + assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"