mirror of
https://github.com/microsoft/autogen.git
synced 2025-09-30 10:36:44 +00:00

* Add isort * Apply isort on py files * Fix circular import * Fix format for notebooks * Fix format --------- Co-authored-by: Chi Wang <wang.chi@microsoft.com>
280 lines
11 KiB
Python
280 lines
11 KiB
Python
import os
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
from requests import Session
|
|
|
|
import autogen
|
|
|
|
from .datamodel import AgentConfig, AgentFlowSpec, AgentWorkFlowConfig, Message, SocketMessage
|
|
from .utils import clear_folder, get_skills_from_prompt, sanitize_model
|
|
|
|
|
|
class AutoGenWorkFlowManager:
|
|
"""
|
|
AutoGenWorkFlowManager class to load agents from a provided configuration and run a chat between them
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: AgentWorkFlowConfig,
|
|
history: Optional[List[Message]] = None,
|
|
work_dir: str = None,
|
|
clear_work_dir: bool = True,
|
|
send_message_function: Optional[callable] = None,
|
|
connection_id: Optional[str] = None,
|
|
) -> None:
|
|
"""
|
|
Initializes the AutoGenFlow with agents specified in the config and optional
|
|
message history.
|
|
|
|
Args:
|
|
config: The configuration settings for the sender and receiver agents.
|
|
history: An optional list of previous messages to populate the agents' history.
|
|
|
|
"""
|
|
self.send_message_function = send_message_function
|
|
self.connection_id = connection_id
|
|
self.work_dir = work_dir or "work_dir"
|
|
if clear_work_dir:
|
|
clear_folder(self.work_dir)
|
|
self.config = config
|
|
# given the config, return an AutoGen agent object
|
|
self.sender = self.load(config.sender)
|
|
# given the config, return an AutoGen agent object
|
|
self.receiver = self.load(config.receiver)
|
|
self.agent_history = []
|
|
|
|
if history:
|
|
self.populate_history(history)
|
|
|
|
def process_message(
|
|
self,
|
|
sender: autogen.Agent,
|
|
receiver: autogen.Agent,
|
|
message: Dict,
|
|
request_reply: bool = False,
|
|
silent: bool = False,
|
|
sender_type: str = "agent",
|
|
) -> None:
|
|
"""
|
|
Processes the message and adds it to the agent history.
|
|
|
|
Args:
|
|
|
|
sender: The sender of the message.
|
|
receiver: The receiver of the message.
|
|
message: The message content.
|
|
request_reply: If set to True, the message will be added to agent history.
|
|
silent: determining verbosity.
|
|
sender_type: The type of the sender of the message.
|
|
"""
|
|
|
|
message = message if isinstance(message, dict) else {"content": message, "role": "user"}
|
|
message_payload = {
|
|
"recipient": receiver.name,
|
|
"sender": sender.name,
|
|
"message": message,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"sender_type": sender_type,
|
|
"connection_id": self.connection_id,
|
|
"message_type": "agent_message",
|
|
}
|
|
# if the agent will respond to the message, or the message is sent by a groupchat agent. This avoids adding groupchat broadcast messages to the history (which are sent with request_reply=False), or when agent populated from history
|
|
if request_reply is not False or sender_type == "groupchat":
|
|
self.agent_history.append(message_payload) # add to history
|
|
if self.send_message_function: # send over the message queue
|
|
socket_msg = SocketMessage(type="agent_message", data=message_payload, connection_id=self.connection_id)
|
|
self.send_message_function(socket_msg.dict())
|
|
|
|
def _sanitize_history_message(self, message: str) -> str:
|
|
"""
|
|
Sanitizes the message e.g. remove references to execution completed
|
|
|
|
Args:
|
|
message: The message to be sanitized.
|
|
|
|
Returns:
|
|
The sanitized message.
|
|
"""
|
|
to_replace = ["execution succeeded", "exitcode"]
|
|
for replace in to_replace:
|
|
message = message.replace(replace, "")
|
|
return message
|
|
|
|
def populate_history(self, history: List[Message]) -> None:
|
|
"""
|
|
Populates the agent message history from the provided list of messages.
|
|
|
|
Args:
|
|
history: A list of messages to populate the agents' history.
|
|
"""
|
|
for msg in history:
|
|
if isinstance(msg, dict):
|
|
msg = Message(**msg)
|
|
if msg.role == "user":
|
|
self.sender.send(
|
|
msg.content,
|
|
self.receiver,
|
|
request_reply=False,
|
|
silent=True,
|
|
)
|
|
elif msg.role == "assistant":
|
|
self.receiver.send(
|
|
msg.content,
|
|
self.sender,
|
|
request_reply=False,
|
|
silent=True,
|
|
)
|
|
|
|
def sanitize_agent_spec(self, agent_spec: AgentFlowSpec) -> AgentFlowSpec:
|
|
"""
|
|
Sanitizes the agent spec by setting loading defaults
|
|
|
|
Args:
|
|
config: The agent configuration to be sanitized.
|
|
agent_type: The type of the agent.
|
|
|
|
Returns:
|
|
The sanitized agent configuration.
|
|
"""
|
|
|
|
agent_spec.config.is_termination_msg = agent_spec.config.is_termination_msg or (
|
|
lambda x: "TERMINATE" in x.get("content", "").rstrip()[-20:]
|
|
)
|
|
|
|
def get_default_system_message(agent_type: str) -> str:
|
|
if agent_type == "assistant":
|
|
return autogen.AssistantAgent.DEFAULT_SYSTEM_MESSAGE
|
|
else:
|
|
return "You are a helpful AI Assistant."
|
|
|
|
# sanitize llm_config if present
|
|
if agent_spec.config.llm_config is not False:
|
|
config_list = []
|
|
for llm in agent_spec.config.llm_config.config_list:
|
|
# check if api_key is present either in llm or env variable
|
|
if "api_key" not in llm and "OPENAI_API_KEY" not in os.environ:
|
|
error_message = f"api_key is not present in llm_config or OPENAI_API_KEY env variable for agent ** {agent_spec.config.name}**. Update your workflow to provide an api_key to use the LLM."
|
|
raise ValueError(error_message)
|
|
|
|
# only add key if value is not None
|
|
sanitized_llm = sanitize_model(llm)
|
|
config_list.append(sanitized_llm)
|
|
agent_spec.config.llm_config.config_list = config_list
|
|
if agent_spec.config.code_execution_config is not False:
|
|
code_execution_config = agent_spec.config.code_execution_config or {}
|
|
code_execution_config["work_dir"] = self.work_dir
|
|
# tbd check if docker is installed
|
|
code_execution_config["use_docker"] = False
|
|
agent_spec.config.code_execution_config = code_execution_config
|
|
|
|
if agent_spec.skills:
|
|
# get skill prompt, also write skills to a file named skills.py
|
|
skills_prompt = ""
|
|
skills_prompt = get_skills_from_prompt(agent_spec.skills, self.work_dir)
|
|
if agent_spec.config.system_message:
|
|
agent_spec.config.system_message = agent_spec.config.system_message + "\n\n" + skills_prompt
|
|
else:
|
|
agent_spec.config.system_message = (
|
|
get_default_system_message(agent_spec.type) + "\n\n" + skills_prompt
|
|
)
|
|
|
|
return agent_spec
|
|
|
|
def load(self, agent_spec: AgentFlowSpec) -> autogen.Agent:
|
|
"""
|
|
Loads an agent based on the provided agent specification.
|
|
|
|
Args:
|
|
agent_spec: The specification of the agent to be loaded.
|
|
|
|
Returns:
|
|
An instance of the loaded agent.
|
|
"""
|
|
agent_spec = self.sanitize_agent_spec(agent_spec)
|
|
if agent_spec.type == "groupchat":
|
|
agents = [
|
|
self.load(self.sanitize_agent_spec(agent_config)) for agent_config in agent_spec.groupchat_config.agents
|
|
]
|
|
group_chat_config = agent_spec.groupchat_config.dict()
|
|
group_chat_config["agents"] = agents
|
|
groupchat = autogen.GroupChat(**group_chat_config)
|
|
agent = ExtendedGroupChatManager(
|
|
groupchat=groupchat, **agent_spec.config.dict(), message_processor=self.process_message
|
|
)
|
|
return agent
|
|
|
|
else:
|
|
agent = self.load_agent_config(agent_spec.config, agent_spec.type)
|
|
return agent
|
|
|
|
def load_agent_config(self, agent_config: AgentConfig, agent_type: str) -> autogen.Agent:
|
|
"""
|
|
Loads an agent based on the provided agent configuration.
|
|
|
|
Args:
|
|
agent_config: The configuration of the agent to be loaded.
|
|
agent_type: The type of the agent to be loaded.
|
|
|
|
Returns:
|
|
An instance of the loaded agent.
|
|
"""
|
|
if agent_type == "assistant":
|
|
agent = ExtendedConversableAgent(**agent_config.dict(), message_processor=self.process_message)
|
|
elif agent_type == "userproxy":
|
|
agent = ExtendedConversableAgent(**agent_config.dict(), message_processor=self.process_message)
|
|
else:
|
|
raise ValueError(f"Unknown agent type: {agent_type}")
|
|
|
|
return agent
|
|
|
|
def run(self, message: str, clear_history: bool = False) -> None:
|
|
"""
|
|
Initiates a chat between the sender and receiver agents with an initial message
|
|
and an option to clear the history.
|
|
|
|
Args:
|
|
message: The initial message to start the chat.
|
|
clear_history: If set to True, clears the chat history before initiating.
|
|
"""
|
|
self.sender.initiate_chat(
|
|
self.receiver,
|
|
message=message,
|
|
clear_history=clear_history,
|
|
)
|
|
|
|
|
|
class ExtendedConversableAgent(autogen.ConversableAgent):
|
|
def __init__(self, message_processor=None, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.message_processor = message_processor
|
|
|
|
def receive(
|
|
self,
|
|
message: Union[Dict, str],
|
|
sender: autogen.Agent,
|
|
request_reply: Optional[bool] = None,
|
|
silent: Optional[bool] = False,
|
|
):
|
|
if self.message_processor:
|
|
self.message_processor(sender, self, message, request_reply, silent, sender_type="agent")
|
|
super().receive(message, sender, request_reply, silent)
|
|
|
|
|
|
class ExtendedGroupChatManager(autogen.GroupChatManager):
|
|
def __init__(self, message_processor=None, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.message_processor = message_processor
|
|
|
|
def receive(
|
|
self,
|
|
message: Union[Dict, str],
|
|
sender: autogen.Agent,
|
|
request_reply: Optional[bool] = None,
|
|
silent: Optional[bool] = False,
|
|
):
|
|
if self.message_processor:
|
|
self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat")
|
|
super().receive(message, sender, request_reply, silent)
|