mirror of
				https://github.com/microsoft/autogen.git
				synced 2025-11-04 03:39:52 +00:00 
			
		
		
		
	* Move from tomllib to tomli * added example code for magentic-one + code comments * adding benchmarks temporarily * add license for datasets * revert changes to magentic-one * change license location --------- Co-authored-by: Ryan Sweet <rysweet@microsoft.com>
		
			
				
	
	
		
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Credits: Hussein Mozannar
 | 
						|
"""
 | 
						|
 | 
						|
import os
 | 
						|
import re
 | 
						|
import json
 | 
						|
import glob
 | 
						|
import logging
 | 
						|
import pandas as pd
 | 
						|
 | 
						|
logging.basicConfig(level=logging.INFO)
 | 
						|
 | 
						|
 | 
						|
def process_logs(logs_path, single_benchmark=False):
 | 
						|
    """
 | 
						|
    logs_path: str, path to the logs directory, containing subdirectories for each benchmark subset
 | 
						|
    returns: pandas DataFrame with all the logs processed
 | 
						|
    """
 | 
						|
    # check if logs_path exists
 | 
						|
    if not os.path.exists(logs_path):
 | 
						|
        raise FileNotFoundError(
 | 
						|
            f"Path {logs_path} does not exist, need to download logs, extract them into one common folder"
 | 
						|
        )
 | 
						|
    if single_benchmark:
 | 
						|
        # subset should be a list with single folder which is the last part of the path
 | 
						|
        subsets = [logs_path.split("/")[-1]]
 | 
						|
        logs_path = "/".join(logs_path.split("/")[:-1])
 | 
						|
 | 
						|
    else:
 | 
						|
        subsets = os.listdir(logs_path)
 | 
						|
    results = []
 | 
						|
    for subset in subsets:
 | 
						|
        # check if folder is not empty
 | 
						|
        if not os.listdir(os.path.join(logs_path, subset)) or subset == ".DS_Store" or subset == "__MACOSX":
 | 
						|
            continue
 | 
						|
        benchmark_name = subset.split("_")[0]
 | 
						|
        instances = [
 | 
						|
            f
 | 
						|
            for f in os.listdir(os.path.join(logs_path, subset))
 | 
						|
            if os.path.isdir(os.path.join(logs_path, subset, f))
 | 
						|
            and os.path.exists(os.path.join(logs_path, subset, f, "0"))
 | 
						|
        ]
 | 
						|
        logging.info(f"Processing {subset} with {len(instances)} instances")
 | 
						|
        for instance in instances:
 | 
						|
            instance_dir_path = os.path.join(logs_path, subset, instance, "0")
 | 
						|
            try:
 | 
						|
                correct, expected_answer, final_answer = scorer(instance_dir_path, benchmark_name)
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"Error processing {instance_dir_path}: {e}")
 | 
						|
                continue
 | 
						|
            messages = get_message_logs(instance_dir_path)
 | 
						|
            results.append(
 | 
						|
                {
 | 
						|
                    "benchmark": benchmark_name,
 | 
						|
                    "subset_benchmark": subset,
 | 
						|
                    "instance": instance,
 | 
						|
                    "task_information": get_task_information(instance_dir_path, benchmark_name),
 | 
						|
                    "expected_answer": expected_answer,
 | 
						|
                    "final_answer": final_answer,
 | 
						|
                    "correct": correct,
 | 
						|
                    "stalled": did_agent_stall(instance_dir_path),
 | 
						|
                    "num_messages": len(messages),
 | 
						|
                    "messages": messages,
 | 
						|
                    "progress_not_being_made": is_progress_not_being_made(instance_dir_path),
 | 
						|
                }
 | 
						|
            )
 | 
						|
    df_logs = pd.DataFrame(results)
 | 
						|
    return df_logs
 | 
						|
 | 
						|
 | 
						|
def normalize_answer(a):
 | 
						|
    """
 | 
						|
    Taken from custom_tabulate.py in the WebArena benchmark, given an answer, returns the normalized answer.
 | 
						|
    Operations: lower case, trim, standardize comma separated values, replace multiple spaces with one space, remove trailing punctuation
 | 
						|
    a: str, answer
 | 
						|
    returns: str, normalized answer
 | 
						|
    """
 | 
						|
    norm_answer = ", ".join(a.strip().lower().split(","))
 | 
						|
    norm_answer = re.sub(r"[\.\!\?]+$", "", re.sub(r"\s+", " ", norm_answer))
 | 
						|
    return norm_answer
 | 
						|
 | 
						|
 | 
						|
def scorer(instance_dir, benchmark_name):
 | 
						|
    """
 | 
						|
    Returns results based on the benchmark name and the instance directory.
 | 
						|
 | 
						|
    benchmark_name: str, the name of the benchmark, either "gaia" or "webarena"
 | 
						|
    instance_dir: str, path to the instance directory
 | 
						|
    returns: tuple, (bool, str, str) or None, depending on the benchmark
 | 
						|
    """
 | 
						|
 | 
						|
    if benchmark_name == "gaia" or benchmark_name == "assistant":
 | 
						|
        # Read the expected answer
 | 
						|
        expected_answer_file = os.path.join(instance_dir, "expected_answer.txt")
 | 
						|
        if not os.path.isfile(expected_answer_file):
 | 
						|
            return None
 | 
						|
 | 
						|
        with open(expected_answer_file, "rt") as fh:
 | 
						|
            expected_answer = fh.read().strip()
 | 
						|
 | 
						|
        # Read the console log
 | 
						|
        console_log_file = os.path.join(instance_dir, "console_log.txt")
 | 
						|
        if not os.path.isfile(console_log_file):
 | 
						|
            return None
 | 
						|
 | 
						|
        with open(console_log_file, "rt") as fh:
 | 
						|
            console_log = fh.read()
 | 
						|
            final_answer = None
 | 
						|
            m = re.search(r"FINAL ANSWER:(.*?)\n", console_log, re.DOTALL)
 | 
						|
            if m:
 | 
						|
                final_answer = m.group(1).strip()
 | 
						|
 | 
						|
            if final_answer is None:
 | 
						|
                return None
 | 
						|
            not_normalized_final = final_answer
 | 
						|
 | 
						|
            n_ex = normalize_answer(expected_answer)
 | 
						|
            n_final = normalize_answer(final_answer)
 | 
						|
            return (n_ex != "" and n_ex == n_final), n_ex, not_normalized_final
 | 
						|
 | 
						|
    elif benchmark_name == "webarena":
 | 
						|
        # Read the console log
 | 
						|
        console_log_file = os.path.join(instance_dir, "console_log.txt")
 | 
						|
        if not os.path.isfile(console_log_file):
 | 
						|
            return None
 | 
						|
 | 
						|
        with open(console_log_file, "rt") as fh:
 | 
						|
            console_log = fh.read()
 | 
						|
            final_score = None
 | 
						|
            m = re.search(r"FINAL SCORE:(.*?)\n", console_log, re.DOTALL)
 | 
						|
            if m:
 | 
						|
                final_score = m.group(1).strip()
 | 
						|
 | 
						|
            if final_score is None:
 | 
						|
                return None
 | 
						|
            else:
 | 
						|
                return float(final_score) > 0, "", ""
 | 
						|
 | 
						|
    else:
 | 
						|
        raise ValueError(f"Unsupported benchmark_name: {benchmark_name}")
 | 
						|
 | 
						|
 | 
						|
def get_number_of_chat_messages(chat_messages_dir):
 | 
						|
    # Count the number of chat messages in the chat_messages_dir
 | 
						|
    result = 0
 | 
						|
    for file in glob.glob(f"{chat_messages_dir}/*_messages.json"):
 | 
						|
        with open(file, "r") as f:
 | 
						|
            content = json.load(f)
 | 
						|
            for agent, messages in content.items():
 | 
						|
                result += len(messages)
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def did_agent_stall(instance_dir):
 | 
						|
    # Check if the agent stalled
 | 
						|
    log_file_path = os.path.join(instance_dir, "log.jsonl")
 | 
						|
    if not os.path.isfile(log_file_path):
 | 
						|
        return None
 | 
						|
    # Stalled.... Replanning...
 | 
						|
    with open(log_file_path, "r") as f:
 | 
						|
        for line in f:
 | 
						|
            if "Stalled.... Replanning..." in line:
 | 
						|
                return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def get_message_logs(instance_dir):
 | 
						|
    # Read the log file and return the messages
 | 
						|
    log_file_path = os.path.join(instance_dir, "log.jsonl")
 | 
						|
    if not os.path.isfile(log_file_path):
 | 
						|
        return None
 | 
						|
    messages = []
 | 
						|
    # for each line, convert to dict, check if it has a message and source key, and append to messages
 | 
						|
    with open(log_file_path, "r") as f:
 | 
						|
        for line in f:
 | 
						|
            line_dict = json.loads(line)
 | 
						|
            if "message" in line_dict and "source" in line_dict:
 | 
						|
                messages.append(line_dict)
 | 
						|
    return messages
 | 
						|
 | 
						|
 | 
						|
def get_task_information(instance_dir, benchmark_name):
 | 
						|
    # Read the task information from the log file
 | 
						|
    if benchmark_name == "gaia" or benchmark_name == "assistant":
 | 
						|
        prompt_file = os.path.join(instance_dir, "prompt.txt")
 | 
						|
        if not os.path.isfile(prompt_file):
 | 
						|
            return None
 | 
						|
        with open(prompt_file, "r") as f:
 | 
						|
            return f.read().strip()
 | 
						|
    elif benchmark_name == "webarena":
 | 
						|
        task_prompt_file = os.path.join(instance_dir, "task_prompt.json")
 | 
						|
        if not os.path.isfile(task_prompt_file):
 | 
						|
            return None
 | 
						|
        with open(task_prompt_file, "r") as f:
 | 
						|
            return json.load(f)["intent"]
 | 
						|
    else:
 | 
						|
        raise ValueError(f"Unsupported benchmark_name: {benchmark_name}")
 | 
						|
 | 
						|
 | 
						|
def is_progress_not_being_made(instance_dir):
 | 
						|
    # if at any point in the log, progress is not being made, return True
 | 
						|
    pattern = r'"is_progress_being_made": \{\s+"reason": ".*?",\s+"answer": false\s+\}'
 | 
						|
    log_file_path = os.path.join(instance_dir, "log.jsonl")
 | 
						|
    if not os.path.isfile(log_file_path):
 | 
						|
        return None
 | 
						|
    with open(log_file_path, "r") as f:
 | 
						|
        for line in f:
 | 
						|
            line_dict = json.loads(line)
 | 
						|
            if (
 | 
						|
                "source" in line_dict
 | 
						|
                and line_dict["source"] == "Orchestrator (thought)"
 | 
						|
                and "Updated Ledger:" in line_dict["message"]
 | 
						|
                and re.search(pattern, line_dict["message"])
 | 
						|
            ):
 | 
						|
                return True
 | 
						|
    return False
 |