407 lines
14 KiB
Python
Raw Normal View History

Sample Web Application Built with AutoGen (#695) * Adding research assistant code * Adding research assistant code * checking in RA files * Remove used text file * Update README.md to include Saleema's name to the Contributors list. * remove extraneous files * update gitignore * improve structure on global skills * fix linting error * readme update * readme update * fix wrong function bug * readme update * update ui build * cleanup, remove unused modules * readme and docs updates * set default user * ui build update * add screenshot to improve instructions * remove logout behaviour, replace with note to developers to add their own logout logic * Create blog and edit ARA README * Added the stock prices example in the readme for ARA * Include edits from review with Saleema * fix format issues * Cosmetic changes for betting debug messages * edit authors * remove references to request_timeout to support autogen v0.0.2 * update bg color for UI * readme update * update research assistant blog post * omit samples folder from codecov * ui build update + precommit refactor * formattiing updates fromo pre-commit * readme update * remove compiled source files * update gitignore * refactor, file removals * refactor for improved structure - datamodel, chat and db helper * update gitignore * refactor, file removals * refactor for improved structure - datamodel, chat and db helper * refactor skills view * general refactor * gitignore update and general refactor * skills update * general refactor * ui folder structure refactor * improve support for skills loading * add fetch profile default skill * refactor chat to autogenchat * qol refactor * improve metadata display * early support for autogenflow in ui * docs update general refactor * general refactor * readme update * readme update * readme and cli update * pre-commit updates * precommit update * readme update * add steup.py for older python build versions * add manifest.in, update app icon * in-progress changes to agent specification * remove use_cache refs * update datamodel, and fix for default serverurl * request_timeout * readme update, fix autogen values * fix pyautogen version * precommit formatting and other qol items * update folder structure * req update * readme and docs update * docs update * remove duplicate in yaml file * add support for explicit skills addition * readme and documentation updates * general refactor * remove blog post, schedule for future PR * readme update, add info on llmconfig * make use_cache False by default unless set * minor ui updates * upgrade ui to use latest uatogen lib version 0.2.0b5 * Ui refactor, support for adding arbitrary model specifications * formatting/precommit checks * update readme, utils default skill --------- Co-authored-by: Piali Choudhury <pialic@microsoft.com> Co-authored-by: Chi Wang <wang.chi@microsoft.com>
2023-11-20 10:40:30 -08:00
import ast
import hashlib
from typing import List, Dict, Union
import os
import shutil
import re
import uuid
import autogen
from .datamodel import AgentConfig, AgentFlowSpec, FlowConfig, LLMConfig, Message
from .db import DBManager
def md5_hash(text: str) -> str:
"""
Compute the MD5 hash of a given text.
:param text: The string to hash
:return: The MD5 hash of the text
"""
return hashlib.md5(text.encode()).hexdigest()
def save_message(message: Message, dbmanager: DBManager) -> None:
"""
Save a message in the database using the provided database manager.
:param message: The Message object containing message data
:param dbmanager: The DBManager instance used to interact with the database
"""
query = "INSERT INTO messages (user_id, root_msg_id, msg_id, role, content, metadata, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?)"
args = (
message.user_id,
message.root_msg_id,
message.msg_id,
message.role,
message.content,
message.metadata,
message.timestamp,
)
dbmanager.query(query=query, args=args)
def load_messages(user_id: str, dbmanager: DBManager) -> List[dict]:
"""
Load messages for a specific user from the database, sorted by timestamp.
:param user_id: The ID of the user whose messages are to be loaded
:param dbmanager: The DBManager instance to interact with the database
:return: A list of dictionaries, each representing a message
"""
query = "SELECT * FROM messages WHERE user_id = ?"
args = (user_id,)
result = dbmanager.query(query=query, args=args, json=True)
# Sort by timestamp ascending
result = sorted(result, key=lambda k: k["timestamp"], reverse=False)
return result
def delete_message(user_id: str, msg_id: str, dbmanager: DBManager, delete_all: bool = False) -> List[dict]:
"""
Delete a specific message or all messages for a user from the database.
:param user_id: The ID of the user whose messages are to be deleted
:param msg_id: The ID of the specific message to be deleted (ignored if delete_all is True)
:param dbmanager: The DBManager instance to interact with the database
:param delete_all: If True, all messages for the user will be deleted
:return: A list of the remaining messages if not all were deleted, otherwise an empty list
"""
if delete_all:
query = "DELETE FROM messages WHERE user_id = ?"
args = (user_id,)
dbmanager.query(query=query, args=args)
return []
else:
query = "DELETE FROM messages WHERE user_id = ? AND msg_id = ?"
args = (user_id, msg_id)
dbmanager.query(query=query, args=args)
messages = load_messages(user_id=user_id, dbmanager=dbmanager)
return messages
def get_modified_files(start_timestamp: float, end_timestamp: float, source_dir: str, dest_dir: str) -> List[str]:
"""
Copy files from source_dir that were modified within a specified timestamp range
to dest_dir, renaming files if they already exist there. The function excludes
files with certain file extensions and names.
:param start_timestamp: The start timestamp to filter modified files.
:param end_timestamp: The end timestamp to filter modified files.
:param source_dir: The directory to search for modified files.
:param dest_dir: The destination directory to copy modified files to.
:return: A list of file paths in dest_dir that were modified and copied over.
Files with extensions "__pycache__", "*.pyc", "__init__.py", and "*.cache"
are ignored.
"""
modified_files = []
ignore_extensions = {".pyc", ".cache"}
ignore_files = {"__pycache__", "__init__.py"}
for root, dirs, files in os.walk(source_dir):
# Excluding the directory "__pycache__" if present
dirs[:] = [d for d in dirs if d not in ignore_files]
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1]
file_name = os.path.basename(file)
if file_ext in ignore_extensions or file_name in ignore_files:
continue
file_mtime = os.path.getmtime(file_path)
if start_timestamp < file_mtime < end_timestamp:
dest_file_path = os.path.join(dest_dir, file)
copy_idx = 1
while os.path.exists(dest_file_path):
base, extension = os.path.splitext(file)
# Handling potential name conflicts by appending a number
dest_file_path = os.path.join(dest_dir, f"{base}_{copy_idx}{extension}")
copy_idx += 1
# Copying the modified file to the destination directory
shutil.copy2(file_path, dest_file_path)
uid = dest_dir.split("/")[-1]
print("******", uid)
file_path = f"files/user/{uid}/{dest_file_path.split('/')[-1]}"
modified_files.append(file_path)
return modified_files
def init_webserver_folders(root_file_path: str) -> Dict[str, str]:
"""
Initialize folders needed for a web server, such as static file directories
and user-specific data directories.
:param root_file_path: The root directory where webserver folders will be created
:return: A dictionary with the path of each created folder
"""
files_static_root = os.path.join(root_file_path, "files/")
static_folder_root = os.path.join(root_file_path, "ui")
workdir_root = os.path.join(root_file_path, "workdir")
skills_dir = os.path.join(root_file_path, "skills")
user_skills_dir = os.path.join(skills_dir, "user")
global_skills_dir = os.path.join(skills_dir, "global")
os.makedirs(files_static_root, exist_ok=True)
os.makedirs(os.path.join(files_static_root, "user"), exist_ok=True)
os.makedirs(static_folder_root, exist_ok=True)
os.makedirs(workdir_root, exist_ok=True)
os.makedirs(skills_dir, exist_ok=True)
os.makedirs(user_skills_dir, exist_ok=True)
os.makedirs(global_skills_dir, exist_ok=True)
folders = {
"files_static_root": files_static_root,
"static_folder_root": static_folder_root,
"workdir_root": workdir_root,
"skills_dir": skills_dir,
"user_skills_dir": user_skills_dir,
"global_skills_dir": global_skills_dir,
}
return folders
def skill_from_folder(folder: str) -> List[Dict[str, str]]:
"""
Given a folder, return a dict of the skill (name, python file content). Only python files are considered.
:param folder: The folder to search for skills
:return: A list of dictionaries, each representing a skill
"""
skills = []
for root, dirs, files in os.walk(folder):
for file in files:
if file.endswith(".py"):
skill_name = file.split(".")[0]
skill_file_path = os.path.join(root, file)
with open(skill_file_path, "r", encoding="utf-8") as f:
skill_content = f.read()
skills.append({"name": skill_name, "content": skill_content, "file_name": file})
return skills
def get_all_skills(user_skills_path: str, global_skills_path: str, dest_dir: str = None) -> List[Dict[str, str]]:
"""
Get all skills from the user and global skills directories. If dest_dir, copy all skills to dest_dir.
:param user_skills_path: The path to the user skills directory
:param global_skills_path: The path to the global skills directory
:param dest_dir: The destination directory to copy all skills to
:return: A dictionary of user and global skills
"""
user_skills = skill_from_folder(user_skills_path)
os.makedirs(user_skills_path, exist_ok=True)
global_skills = skill_from_folder(global_skills_path)
skills = {
"user": user_skills,
"global": global_skills,
}
if dest_dir:
# chcek if dest_dir exists
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# copy all skills to dest_dir
for skill in user_skills + global_skills:
skill_file_path = os.path.join(dest_dir, skill["file_name"])
with open(skill_file_path, "w", encoding="utf-8") as f:
f.write(skill["content"])
return skills
def get_skills_prompt(skills: List[Dict[str, str]]) -> str:
"""
Get a prompt with the content of all skills.
:param skills: A dictionary of user and global skills
:return: A string containing the content of all skills
"""
user_skills = skills["user"]
global_skills = skills["global"]
all_skills = user_skills + global_skills
prompt = """
While solving the task you may use functions in the files below.
To use a function from a file in code, import the file and then use the function.
If you need to install python packages, write shell code to
install via pip and use --quiet option.
"""
for skill in all_skills:
prompt += f"""
##### Begin of {skill["file_name"]} #####
{skill["content"]}
#### End of {skill["file_name"]} ####
"""
return prompt
def delete_files_in_folder(folders: Union[str, List[str]]) -> None:
"""
Delete all files and directories in the specified folders.
:param folders: A list of folders or a single folder string
"""
if isinstance(folders, str):
folders = [folders]
for folder in folders:
# Check if the folder exists
if not os.path.isdir(folder):
print(f"The folder {folder} does not exist.")
continue
# List all the entries in the directory
for entry in os.listdir(folder):
# Get the full path
path = os.path.join(folder, entry)
try:
if os.path.isfile(path) or os.path.islink(path):
# Remove the file or link
os.remove(path)
elif os.path.isdir(path):
# Remove the directory and all its content
shutil.rmtree(path)
except Exception as e:
# Print the error message and skip
print(f"Failed to delete {path}. Reason: {e}")
def get_default_agent_config(work_dir: str, skills_suffix: str = "") -> FlowConfig:
"""
Get a default agent flow config .
"""
llm_config = LLMConfig(
config_list=[{"model": "gpt-4"}],
temperature=0,
)
USER_PROXY_INSTRUCTIONS = """If the request has been addressed sufficiently, summarize the answer and end with the word TERMINATE. Otherwise, ask a follow-up question.
"""
userproxy_spec = AgentFlowSpec(
type="userproxy",
config=AgentConfig(
name="user_proxy",
human_input_mode="NEVER",
system_message=USER_PROXY_INSTRUCTIONS,
code_execution_config={
"work_dir": work_dir,
"use_docker": False,
},
max_consecutive_auto_reply=10,
llm_config=llm_config,
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
),
)
assistant_spec = AgentFlowSpec(
type="assistant",
config=AgentConfig(
name="primary_assistant",
system_message=autogen.AssistantAgent.DEFAULT_SYSTEM_MESSAGE + skills_suffix,
llm_config=llm_config,
),
)
flow_config = FlowConfig(
name="default",
sender=userproxy_spec,
receiver=assistant_spec,
type="default",
)
return flow_config
def extract_successful_code_blocks(messages: List[Dict[str, str]]) -> List[str]:
"""
Parses through a list of messages containing code blocks and execution statuses,
returning the array of code blocks that executed successfully and retains
the backticks for Markdown rendering.
Parameters:
messages (List[Dict[str, str]]): A list of message dictionaries containing 'content' and 'role' keys.
Returns:
List[str]: A list containing the code blocks that were successfully executed, including backticks.
"""
successful_code_blocks = []
code_block_regex = r"```[\s\S]*?```" # Regex pattern to capture code blocks enclosed in triple backticks.
for i, message in enumerate(messages):
if message["role"] == "user" and "execution succeeded" in message["content"]:
if i > 0 and messages[i - 1]["role"] == "assistant":
prev_content = messages[i - 1]["content"]
# Find all matches for code blocks
code_blocks = re.findall(code_block_regex, prev_content)
successful_code_blocks.extend(code_blocks) # Add the code blocks with backticks
return successful_code_blocks
def create_skills_from_code(dest_dir: str, skills: Union[str, List[str]]) -> None:
"""
Create skills from a list of code blocks.
Parameters:
dest_dir (str): The destination directory to copy all skills to.
skills (Union[str, List[str]]): A list of strings containing code blocks.
"""
# Ensure skills is a list
if isinstance(skills, str):
skills = [skills]
# Check if dest_dir exists
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for skill in skills:
# Attempt to parse the code and extract the top-level function name
try:
parsed = ast.parse(skill)
function_name = None
for node in parsed.body:
if isinstance(node, ast.FunctionDef):
function_name = node.name
break
if function_name is None:
raise ValueError("No top-level function definition found.")
# Sanitize the function name for use as a file name
function_name = "".join(ch for ch in function_name if ch.isalnum() or ch == "_")
skill_file_name = f"{function_name}.py"
except (ValueError, SyntaxError):
skill_file_name = "new_skill.py"
# If the generated/sanitized name already exists, append an index
skill_file_path = os.path.join(dest_dir, skill_file_name)
index = 1
while os.path.exists(skill_file_path):
base, ext = os.path.splitext(skill_file_name)
if base.endswith(f"_{index - 1}"):
base = base.rsplit("_", 1)[0]
skill_file_path = os.path.join(dest_dir, f"{base}_{index}{ext}")
index += 1
# Write the skill to the file
with open(skill_file_path, "w", encoding="utf-8") as f:
f.write(skill)