mirror of
				https://github.com/microsoft/autogen.git
				synced 2025-10-31 09:50:11 +00:00 
			
		
		
		
	 e11d84b996
			
		
	
	
		e11d84b996
		
			
		
	
	
	
	
		
			
			* Move from tomllib to tomli * added example code for magentic-one + code comments * adding benchmarks temporarily * add license for datasets * revert changes to magentic-one * change license location --------- Co-authored-by: Ryan Sweet <rysweet@microsoft.com>
		
			
				
	
	
		
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Credits: Hussein Mozannar
 | |
| """
 | |
| 
 | |
| import os
 | |
| import re
 | |
| import json
 | |
| import glob
 | |
| import logging
 | |
| import pandas as pd
 | |
| 
 | |
| logging.basicConfig(level=logging.INFO)
 | |
| 
 | |
| 
 | |
| def process_logs(logs_path, single_benchmark=False):
 | |
|     """
 | |
|     logs_path: str, path to the logs directory, containing subdirectories for each benchmark subset
 | |
|     returns: pandas DataFrame with all the logs processed
 | |
|     """
 | |
|     # check if logs_path exists
 | |
|     if not os.path.exists(logs_path):
 | |
|         raise FileNotFoundError(
 | |
|             f"Path {logs_path} does not exist, need to download logs, extract them into one common folder"
 | |
|         )
 | |
|     if single_benchmark:
 | |
|         # subset should be a list with single folder which is the last part of the path
 | |
|         subsets = [logs_path.split("/")[-1]]
 | |
|         logs_path = "/".join(logs_path.split("/")[:-1])
 | |
| 
 | |
|     else:
 | |
|         subsets = os.listdir(logs_path)
 | |
|     results = []
 | |
|     for subset in subsets:
 | |
|         # check if folder is not empty
 | |
|         if not os.listdir(os.path.join(logs_path, subset)) or subset == ".DS_Store" or subset == "__MACOSX":
 | |
|             continue
 | |
|         benchmark_name = subset.split("_")[0]
 | |
|         instances = [
 | |
|             f
 | |
|             for f in os.listdir(os.path.join(logs_path, subset))
 | |
|             if os.path.isdir(os.path.join(logs_path, subset, f))
 | |
|             and os.path.exists(os.path.join(logs_path, subset, f, "0"))
 | |
|         ]
 | |
|         logging.info(f"Processing {subset} with {len(instances)} instances")
 | |
|         for instance in instances:
 | |
|             instance_dir_path = os.path.join(logs_path, subset, instance, "0")
 | |
|             try:
 | |
|                 correct, expected_answer, final_answer = scorer(instance_dir_path, benchmark_name)
 | |
|             except Exception as e:
 | |
|                 logging.error(f"Error processing {instance_dir_path}: {e}")
 | |
|                 continue
 | |
|             messages = get_message_logs(instance_dir_path)
 | |
|             results.append(
 | |
|                 {
 | |
|                     "benchmark": benchmark_name,
 | |
|                     "subset_benchmark": subset,
 | |
|                     "instance": instance,
 | |
|                     "task_information": get_task_information(instance_dir_path, benchmark_name),
 | |
|                     "expected_answer": expected_answer,
 | |
|                     "final_answer": final_answer,
 | |
|                     "correct": correct,
 | |
|                     "stalled": did_agent_stall(instance_dir_path),
 | |
|                     "num_messages": len(messages),
 | |
|                     "messages": messages,
 | |
|                     "progress_not_being_made": is_progress_not_being_made(instance_dir_path),
 | |
|                 }
 | |
|             )
 | |
|     df_logs = pd.DataFrame(results)
 | |
|     return df_logs
 | |
| 
 | |
| 
 | |
| def normalize_answer(a):
 | |
|     """
 | |
|     Taken from custom_tabulate.py in the WebArena benchmark, given an answer, returns the normalized answer.
 | |
|     Operations: lower case, trim, standardize comma separated values, replace multiple spaces with one space, remove trailing punctuation
 | |
|     a: str, answer
 | |
|     returns: str, normalized answer
 | |
|     """
 | |
|     norm_answer = ", ".join(a.strip().lower().split(","))
 | |
|     norm_answer = re.sub(r"[\.\!\?]+$", "", re.sub(r"\s+", " ", norm_answer))
 | |
|     return norm_answer
 | |
| 
 | |
| 
 | |
| def scorer(instance_dir, benchmark_name):
 | |
|     """
 | |
|     Returns results based on the benchmark name and the instance directory.
 | |
| 
 | |
|     benchmark_name: str, the name of the benchmark, either "gaia" or "webarena"
 | |
|     instance_dir: str, path to the instance directory
 | |
|     returns: tuple, (bool, str, str) or None, depending on the benchmark
 | |
|     """
 | |
| 
 | |
|     if benchmark_name == "gaia" or benchmark_name == "assistant":
 | |
|         # Read the expected answer
 | |
|         expected_answer_file = os.path.join(instance_dir, "expected_answer.txt")
 | |
|         if not os.path.isfile(expected_answer_file):
 | |
|             return None
 | |
| 
 | |
|         with open(expected_answer_file, "rt") as fh:
 | |
|             expected_answer = fh.read().strip()
 | |
| 
 | |
|         # Read the console log
 | |
|         console_log_file = os.path.join(instance_dir, "console_log.txt")
 | |
|         if not os.path.isfile(console_log_file):
 | |
|             return None
 | |
| 
 | |
|         with open(console_log_file, "rt") as fh:
 | |
|             console_log = fh.read()
 | |
|             final_answer = None
 | |
|             m = re.search(r"FINAL ANSWER:(.*?)\n", console_log, re.DOTALL)
 | |
|             if m:
 | |
|                 final_answer = m.group(1).strip()
 | |
| 
 | |
|             if final_answer is None:
 | |
|                 return None
 | |
|             not_normalized_final = final_answer
 | |
| 
 | |
|             n_ex = normalize_answer(expected_answer)
 | |
|             n_final = normalize_answer(final_answer)
 | |
|             return (n_ex != "" and n_ex == n_final), n_ex, not_normalized_final
 | |
| 
 | |
|     elif benchmark_name == "webarena":
 | |
|         # Read the console log
 | |
|         console_log_file = os.path.join(instance_dir, "console_log.txt")
 | |
|         if not os.path.isfile(console_log_file):
 | |
|             return None
 | |
| 
 | |
|         with open(console_log_file, "rt") as fh:
 | |
|             console_log = fh.read()
 | |
|             final_score = None
 | |
|             m = re.search(r"FINAL SCORE:(.*?)\n", console_log, re.DOTALL)
 | |
|             if m:
 | |
|                 final_score = m.group(1).strip()
 | |
| 
 | |
|             if final_score is None:
 | |
|                 return None
 | |
|             else:
 | |
|                 return float(final_score) > 0, "", ""
 | |
| 
 | |
|     else:
 | |
|         raise ValueError(f"Unsupported benchmark_name: {benchmark_name}")
 | |
| 
 | |
| 
 | |
| def get_number_of_chat_messages(chat_messages_dir):
 | |
|     # Count the number of chat messages in the chat_messages_dir
 | |
|     result = 0
 | |
|     for file in glob.glob(f"{chat_messages_dir}/*_messages.json"):
 | |
|         with open(file, "r") as f:
 | |
|             content = json.load(f)
 | |
|             for agent, messages in content.items():
 | |
|                 result += len(messages)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def did_agent_stall(instance_dir):
 | |
|     # Check if the agent stalled
 | |
|     log_file_path = os.path.join(instance_dir, "log.jsonl")
 | |
|     if not os.path.isfile(log_file_path):
 | |
|         return None
 | |
|     # Stalled.... Replanning...
 | |
|     with open(log_file_path, "r") as f:
 | |
|         for line in f:
 | |
|             if "Stalled.... Replanning..." in line:
 | |
|                 return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def get_message_logs(instance_dir):
 | |
|     # Read the log file and return the messages
 | |
|     log_file_path = os.path.join(instance_dir, "log.jsonl")
 | |
|     if not os.path.isfile(log_file_path):
 | |
|         return None
 | |
|     messages = []
 | |
|     # for each line, convert to dict, check if it has a message and source key, and append to messages
 | |
|     with open(log_file_path, "r") as f:
 | |
|         for line in f:
 | |
|             line_dict = json.loads(line)
 | |
|             if "message" in line_dict and "source" in line_dict:
 | |
|                 messages.append(line_dict)
 | |
|     return messages
 | |
| 
 | |
| 
 | |
| def get_task_information(instance_dir, benchmark_name):
 | |
|     # Read the task information from the log file
 | |
|     if benchmark_name == "gaia" or benchmark_name == "assistant":
 | |
|         prompt_file = os.path.join(instance_dir, "prompt.txt")
 | |
|         if not os.path.isfile(prompt_file):
 | |
|             return None
 | |
|         with open(prompt_file, "r") as f:
 | |
|             return f.read().strip()
 | |
|     elif benchmark_name == "webarena":
 | |
|         task_prompt_file = os.path.join(instance_dir, "task_prompt.json")
 | |
|         if not os.path.isfile(task_prompt_file):
 | |
|             return None
 | |
|         with open(task_prompt_file, "r") as f:
 | |
|             return json.load(f)["intent"]
 | |
|     else:
 | |
|         raise ValueError(f"Unsupported benchmark_name: {benchmark_name}")
 | |
| 
 | |
| 
 | |
| def is_progress_not_being_made(instance_dir):
 | |
|     # if at any point in the log, progress is not being made, return True
 | |
|     pattern = r'"is_progress_being_made": \{\s+"reason": ".*?",\s+"answer": false\s+\}'
 | |
|     log_file_path = os.path.join(instance_dir, "log.jsonl")
 | |
|     if not os.path.isfile(log_file_path):
 | |
|         return None
 | |
|     with open(log_file_path, "r") as f:
 | |
|         for line in f:
 | |
|             line_dict = json.loads(line)
 | |
|             if (
 | |
|                 "source" in line_dict
 | |
|                 and line_dict["source"] == "Orchestrator (thought)"
 | |
|                 and "Updated Ledger:" in line_dict["message"]
 | |
|                 and re.search(pattern, line_dict["message"])
 | |
|             ):
 | |
|                 return True
 | |
|     return False
 |