--- title: "MCP" id: integrations-mcp description: "MCP integration for Haystack" slug: "/integrations-mcp" --- ## Module haystack\_integrations.tools.mcp.mcp\_tool ### AsyncExecutor Thread-safe event loop executor for running async code from sync contexts. #### AsyncExecutor.get\_instance ```python @classmethod def get_instance(cls) -> "AsyncExecutor" ``` Get or create the global singleton executor instance. #### AsyncExecutor.\_\_init\_\_ ```python def __init__() ``` Initialize a dedicated event loop #### AsyncExecutor.run ```python def run(coro: Coroutine[Any, Any, Any], timeout: float | None = None) -> Any ``` Run a coroutine in the event loop. **Arguments**: - `coro`: Coroutine to execute - `timeout`: Optional timeout in seconds **Raises**: - `TimeoutError`: If execution exceeds timeout **Returns**: Result of the coroutine #### AsyncExecutor.get\_loop ```python def get_loop() ``` Get the event loop. **Returns**: The event loop #### AsyncExecutor.run\_background ```python def run_background( coro_factory: Callable[[asyncio.Event], Coroutine[Any, Any, Any]], timeout: float | None = None ) -> tuple[concurrent.futures.Future[Any], asyncio.Event] ``` Schedule `coro_factory` to run in the executor's event loop **without** blocking the caller thread. The factory receives an :class:`asyncio.Event` that can be used to cooperatively shut the coroutine down. The method returns **both** the concurrent future (to observe completion or failure) and the created *stop_event* so that callers can signal termination. **Arguments**: - `coro_factory`: A callable receiving the stop_event and returning the coroutine to execute. - `timeout`: Optional timeout while waiting for the stop_event to be created. **Returns**: Tuple ``(future, stop_event)``. #### AsyncExecutor.shutdown ```python def shutdown(timeout: float = 2) -> None ``` Shut down the background event loop and thread. **Arguments**: - `timeout`: Timeout in seconds for shutting down the event loop ### MCPError Base class for MCP-related errors. #### MCPError.\_\_init\_\_ ```python def __init__(message: str) -> None ``` Initialize the MCPError. **Arguments**: - `message`: Descriptive error message ### MCPConnectionError Error connecting to MCP server. #### MCPConnectionError.\_\_init\_\_ ```python def __init__(message: str, server_info: "MCPServerInfo | None" = None, operation: str | None = None) -> None ``` Initialize the MCPConnectionError. **Arguments**: - `message`: Descriptive error message - `server_info`: Server connection information that was used - `operation`: Name of the operation that was being attempted ### MCPToolNotFoundError Error when a tool is not found on the server. #### MCPToolNotFoundError.\_\_init\_\_ ```python def __init__(message: str, tool_name: str, available_tools: list[str] | None = None) -> None ``` Initialize the MCPToolNotFoundError. **Arguments**: - `message`: Descriptive error message - `tool_name`: Name of the tool that was requested but not found - `available_tools`: List of available tool names, if known ### MCPInvocationError Error during tool invocation. #### MCPInvocationError.\_\_init\_\_ ```python def __init__(message: str, tool_name: str, tool_args: dict[str, Any] | None = None) -> None ``` Initialize the MCPInvocationError. **Arguments**: - `message`: Descriptive error message - `tool_name`: Name of the tool that was being invoked - `tool_args`: Arguments that were passed to the tool ### MCPClient Abstract base class for MCP clients. This class defines the common interface and shared functionality for all MCP clients, regardless of the transport mechanism used. #### MCPClient.connect ```python @abstractmethod async def connect() -> list[types.Tool] ``` Connect to an MCP server. **Raises**: - `MCPConnectionError`: If connection to the server fails **Returns**: List of available tools on the server #### MCPClient.call\_tool ```python async def call_tool(tool_name: str, tool_args: dict[str, Any]) -> str ``` Call a tool on the connected MCP server. **Arguments**: - `tool_name`: Name of the tool to call - `tool_args`: Arguments to pass to the tool **Raises**: - `MCPConnectionError`: If not connected to an MCP server - `MCPInvocationError`: If the tool invocation fails **Returns**: JSON string representation of the tool invocation result #### MCPClient.aclose ```python async def aclose() -> None ``` Close the connection and clean up resources. This method ensures all resources are properly released, even if errors occur. ### StdioClient MCP client that connects to servers using stdio transport. #### StdioClient.\_\_init\_\_ ```python def __init__(command: str, args: list[str] | None = None, env: dict[str, str | Secret] | None = None, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0) -> None ``` Initialize a stdio MCP client. **Arguments**: - `command`: Command to run (e.g., "python", "node") - `args`: Arguments to pass to the command - `env`: Environment variables for the command - `max_retries`: Maximum number of reconnection attempts - `base_delay`: Base delay for exponential backoff in seconds #### StdioClient.connect ```python async def connect() -> list[types.Tool] ``` Connect to an MCP server using stdio transport. **Raises**: - `MCPConnectionError`: If connection to the server fails **Returns**: List of available tools on the server ### SSEClient MCP client that connects to servers using SSE transport. #### SSEClient.\_\_init\_\_ ```python def __init__(server_info: "SSEServerInfo", max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0) -> None ``` Initialize an SSE MCP client using server configuration. **Arguments**: - `server_info`: Configuration object containing URL, token, timeout, etc. - `max_retries`: Maximum number of reconnection attempts - `base_delay`: Base delay for exponential backoff in seconds #### SSEClient.connect ```python async def connect() -> list[types.Tool] ``` Connect to an MCP server using SSE transport. **Raises**: - `MCPConnectionError`: If connection to the server fails **Returns**: List of available tools on the server ### StreamableHttpClient MCP client that connects to servers using streamable HTTP transport. #### StreamableHttpClient.\_\_init\_\_ ```python def __init__(server_info: "StreamableHttpServerInfo", max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0) -> None ``` Initialize a streamable HTTP MCP client using server configuration. **Arguments**: - `server_info`: Configuration object containing URL, token, timeout, etc. - `max_retries`: Maximum number of reconnection attempts - `base_delay`: Base delay for exponential backoff in seconds #### StreamableHttpClient.connect ```python async def connect() -> list[types.Tool] ``` Connect to an MCP server using streamable HTTP transport. **Raises**: - `MCPConnectionError`: If connection to the server fails **Returns**: List of available tools on the server ### MCPServerInfo Abstract base class for MCP server connection parameters. This class defines the common interface for all MCP server connection types. #### MCPServerInfo.create\_client ```python @abstractmethod def create_client() -> MCPClient ``` Create an appropriate MCP client for this server info. **Returns**: An instance of MCPClient configured with this server info #### MCPServerInfo.to\_dict ```python def to_dict() -> dict[str, Any] ``` Serialize this server info to a dictionary. **Returns**: Dictionary representation of this server info #### MCPServerInfo.from\_dict ```python @classmethod def from_dict(cls, data: dict[str, Any]) -> "MCPServerInfo" ``` Deserialize server info from a dictionary. **Arguments**: - `data`: Dictionary containing serialized server info **Returns**: Instance of the appropriate server info class ### SSEServerInfo Data class that encapsulates SSE MCP server connection parameters. For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization: ```python server_info = SSEServerInfo( url="https://my-mcp-server.com", token=Secret.from_env_var("API_KEY"), ) ``` **Arguments**: - `url`: Full URL of the MCP server (including /sse endpoint) - `base_url`: Base URL of the MCP server (deprecated, use url instead) - `token`: Authentication token for the server (optional) - `timeout`: Connection timeout in seconds #### base\_url deprecated #### SSEServerInfo.\_\_post\_init\_\_ ```python def __post_init__() ``` Validate that either url or base_url is provided. #### SSEServerInfo.create\_client ```python def create_client() -> MCPClient ``` Create an SSE MCP client. **Returns**: Configured MCPClient instance ### StreamableHttpServerInfo Data class that encapsulates streamable HTTP MCP server connection parameters. For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization: ```python server_info = StreamableHttpServerInfo( url="https://my-mcp-server.com", token=Secret.from_env_var("API_KEY"), ) ``` **Arguments**: - `url`: Full URL of the MCP server (streamable HTTP endpoint) - `token`: Authentication token for the server (optional) - `timeout`: Connection timeout in seconds #### StreamableHttpServerInfo.\_\_post\_init\_\_ ```python def __post_init__() ``` Validate the URL. #### StreamableHttpServerInfo.create\_client ```python def create_client() -> MCPClient ``` Create a streamable HTTP MCP client. **Returns**: Configured StreamableHttpClient instance ### StdioServerInfo Data class that encapsulates stdio MCP server connection parameters. **Arguments**: - `command`: Command to run (e.g., "python", "node") - `args`: Arguments to pass to the command - `env`: Environment variables for the command For environment variables containing sensitive data, you can use Secret objects for secure handling and serialization: ```python server_info = StdioServerInfo( command="uv", args=["run", "my-mcp-server"], env={ "WORKSPACE_PATH": "/path/to/workspace", # Plain string "API_KEY": Secret.from_env_var("API_KEY"), # Secret object } ) ``` Secret objects will be properly serialized and deserialized without exposing the secret value, while plain strings will be preserved as-is. Use Secret objects for sensitive data that needs to be handled securely. #### StdioServerInfo.create\_client ```python def create_client() -> MCPClient ``` Create a stdio MCP client. **Returns**: Configured StdioMCPClient instance ### MCPTool A Tool that represents a single tool from an MCP server. This implementation uses the official MCP SDK for protocol handling while maintaining compatibility with the Haystack tool ecosystem. Response handling: - Text and image content are supported and returned as JSON strings - The JSON contains the structured response from the MCP server - Use json.loads() to parse the response into a dictionary Example using Streamable HTTP: ```python import json from haystack_integrations.tools.mcp import MCPTool, StreamableHttpServerInfo # Create tool instance tool = MCPTool( name="multiply", server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp") ) # Use the tool and parse result result_json = tool.invoke(a=5, b=3) result = json.loads(result_json) ``` Example using SSE (deprecated): ```python import json from haystack.tools import MCPTool, SSEServerInfo # Create tool instance tool = MCPTool( name="add", server_info=SSEServerInfo(url="http://localhost:8000/sse") ) # Use the tool and parse result result_json = tool.invoke(a=5, b=3) result = json.loads(result_json) ``` Example using stdio: ```python import json from haystack.tools import MCPTool, StdioServerInfo # Create tool instance tool = MCPTool( name="get_current_time", server_info=StdioServerInfo(command="python", args=["path/to/server.py"]) ) # Use the tool and parse result result_json = tool.invoke(timezone="America/New_York") result = json.loads(result_json) ``` #### MCPTool.\_\_init\_\_ ```python def __init__(name: str, server_info: MCPServerInfo, description: str | None = None, connection_timeout: int = 30, invocation_timeout: int = 30) ``` Initialize the MCP tool. **Arguments**: - `name`: Name of the tool to use - `server_info`: Server connection information - `description`: Custom description (if None, server description will be used) - `connection_timeout`: Timeout in seconds for server connection - `invocation_timeout`: Default timeout in seconds for tool invocations **Raises**: - `MCPConnectionError`: If connection to the server fails - `MCPToolNotFoundError`: If no tools are available or the requested tool is not found - `TimeoutError`: If connection times out #### MCPTool.ainvoke ```python async def ainvoke(**kwargs: Any) -> str ``` Asynchronous tool invocation. **Arguments**: - `kwargs`: Arguments to pass to the tool **Raises**: - `MCPInvocationError`: If the tool invocation fails - `TimeoutError`: If the operation times out **Returns**: JSON string representation of the tool invocation result #### MCPTool.to\_dict ```python def to_dict() -> dict[str, Any] ``` Serializes the MCPTool to a dictionary. The serialization preserves all information needed to recreate the tool, including server connection parameters and timeout settings. Note that the active connection is not maintained. **Returns**: Dictionary with serialized data in the format: `{"type": fully_qualified_class_name, "data": {parameters}}` #### MCPTool.from\_dict ```python @classmethod def from_dict(cls, data: dict[str, Any]) -> "Tool" ``` Deserializes the MCPTool from a dictionary. This method reconstructs an MCPTool instance from a serialized dictionary, including recreating the server_info object. A new connection will be established to the MCP server during initialization. **Arguments**: - `data`: Dictionary containing serialized tool data **Raises**: - `None`: Various exceptions if connection fails **Returns**: A fully initialized MCPTool instance #### MCPTool.close ```python def close() ``` Close the tool synchronously. #### MCPTool.\_\_del\_\_ ```python def __del__() ``` Cleanup resources when the tool is garbage collected. #### MCPTool.tool\_spec ```python @property def tool_spec() -> dict[str, Any] ``` Return the Tool specification to be used by the Language Model. #### MCPTool.warm\_up ```python def warm_up() -> None ``` Prepare the Tool for use. Override this method to establish connections to remote services, load models, or perform other resource-intensive initialization. This method should be idempotent, as it may be called multiple times. #### MCPTool.invoke ```python def invoke(**kwargs: Any) -> Any ``` Invoke the Tool with the provided keyword arguments. ### \_MCPClientSessionManager Runs an MCPClient connect/close inside the AsyncExecutor's event loop. Life-cycle: 1. Create the worker to schedule a long-running coroutine in the dedicated background loop. 2. The coroutine calls *connect* on mcp client; when it has the tool list it fulfils a concurrent future so the synchronous thread can continue. 3. It then waits on an `asyncio.Event`. 4. `stop()` sets the event from any thread. The same coroutine then calls *close()* on mcp client and finishes without the dreaded `Attempted to exit cancel scope in a different task than it was entered in` error thus properly closing the client. #### \_MCPClientSessionManager.tools ```python def tools() -> list[types.Tool] ``` Return the tool list already collected during startup. #### \_MCPClientSessionManager.stop ```python def stop() -> None ``` Request the worker to shut down and block until done. ## Module haystack\_integrations.tools.mcp.mcp\_toolset ### MCPToolset A Toolset that connects to an MCP (Model Context Protocol) server and provides access to its tools. MCPToolset dynamically discovers and loads all tools from any MCP-compliant server, supporting both network-based streaming connections (Streamable HTTP, SSE) and local process-based stdio connections. This dual connectivity allows for integrating with both remote and local MCP servers. Example using MCPToolset in a Haystack Pipeline: ```python # Prerequisites: # 1. pip install uvx mcp-server-time # Install required MCP server and tools # 2. export OPENAI_API_KEY="your-api-key" # Set up your OpenAI API key import os from haystack import Pipeline from haystack.components.converters import OutputAdapter from haystack.components.generators.chat import OpenAIChatGenerator from haystack.components.tools import ToolInvoker from haystack.dataclasses import ChatMessage from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo # Create server info for the time service (can also use SSEServerInfo for remote servers) server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"]) # Create the toolset - this will automatically discover all available tools # You can optionally specify which tools to include mcp_toolset = MCPToolset( server_info=server_info, tool_names=["get_current_time"] # Only include the get_current_time tool ) # Create a pipeline with the toolset pipeline = Pipeline() pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset)) pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset)) pipeline.add_component( "adapter", OutputAdapter( template="{{ initial_msg + initial_tool_messages + tool_messages }}", output_type=list[ChatMessage], unsafe=True, ), ) pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini")) pipeline.connect("llm.replies", "tool_invoker.messages") pipeline.connect("llm.replies", "adapter.initial_tool_messages") pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages") pipeline.connect("adapter.output", "response_llm.messages") # Run the pipeline with a user question user_input = "What is the time in New York? Be brief." user_input_msg = ChatMessage.from_user(text=user_input) result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}}) print(result["response_llm"]["replies"][0].text) ``` You can also use the toolset via Streamable HTTP to talk to remote servers: ```python from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo # Create the toolset with streamable HTTP connection toolset = MCPToolset( server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp"), tool_names=["multiply"] # Optional: only include specific tools ) # Use the toolset as shown in the pipeline example above ``` Example using SSE (deprecated): ```python from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo from haystack.components.tools import ToolInvoker # Create the toolset with an SSE connection sse_toolset = MCPToolset( server_info=SSEServerInfo(url="http://some-remote-server.com:8000/sse"), tool_names=["add", "subtract"] # Only include specific tools ) # Use the toolset as shown in the pipeline example above ``` #### MCPToolset.\_\_init\_\_ ```python def __init__(server_info: MCPServerInfo, tool_names: list[str] | None = None, connection_timeout: float = 30.0, invocation_timeout: float = 30.0) ``` Initialize the MCP toolset. **Arguments**: - `server_info`: Connection information for the MCP server - `tool_names`: Optional list of tool names to include. If provided, only tools with matching names will be added to the toolset. - `connection_timeout`: Timeout in seconds for server connection - `invocation_timeout`: Default timeout in seconds for tool invocations **Raises**: - `MCPToolNotFoundError`: If any of the specified tool names are not found on the server #### MCPToolset.to\_dict ```python def to_dict() -> dict[str, Any] ``` Serialize the MCPToolset to a dictionary. **Returns**: A dictionary representation of the MCPToolset #### MCPToolset.from\_dict ```python @classmethod def from_dict(cls, data: dict[str, Any]) -> "MCPToolset" ``` Deserialize an MCPToolset from a dictionary. **Arguments**: - `data`: Dictionary representation of the MCPToolset **Returns**: A new MCPToolset instance #### MCPToolset.close ```python def close() ``` Close the underlying MCP client safely. #### MCPToolset.\_\_post\_init\_\_ ```python def __post_init__() ``` Validate and set up the toolset after initialization. This handles the case when tools are provided during initialization. #### MCPToolset.\_\_iter\_\_ ```python def __iter__() -> Iterator[Tool] ``` Return an iterator over the Tools in this Toolset. This allows the Toolset to be used wherever a list of Tools is expected. **Returns**: An iterator yielding Tool instances #### MCPToolset.\_\_contains\_\_ ```python def __contains__(item: Any) -> bool ``` Check if a tool is in this Toolset. Supports checking by: - Tool instance: tool in toolset - Tool name: "tool_name" in toolset **Arguments**: - `item`: Tool instance or tool name string **Returns**: True if contained, False otherwise #### MCPToolset.warm\_up ```python def warm_up() -> None ``` Prepare the Toolset for use. Override this method to set up shared resources like database connections or HTTP sessions. This method should be idempotent, as it may be called multiple times. #### MCPToolset.add ```python def add(tool: Union[Tool, "Toolset"]) -> None ``` Add a new Tool or merge another Toolset. **Arguments**: - `tool`: A Tool instance or another Toolset to add **Raises**: - `ValueError`: If adding the tool would result in duplicate tool names - `TypeError`: If the provided object is not a Tool or Toolset #### MCPToolset.\_\_add\_\_ ```python def __add__(other: Union[Tool, "Toolset", list[Tool]]) -> "Toolset" ``` Concatenate this Toolset with another Tool, Toolset, or list of Tools. **Arguments**: - `other`: Another Tool, Toolset, or list of Tools to concatenate **Raises**: - `TypeError`: If the other parameter is not a Tool, Toolset, or list of Tools - `ValueError`: If the combination would result in duplicate tool names **Returns**: A new Toolset containing all tools #### MCPToolset.\_\_len\_\_ ```python def __len__() -> int ``` Return the number of Tools in this Toolset. **Returns**: Number of Tools #### MCPToolset.\_\_getitem\_\_ ```python def __getitem__(index) ``` Get a Tool by index. **Arguments**: - `index`: Index of the Tool to get **Returns**: The Tool at the specified index