mirror of
https://github.com/microsoft/autogen.git
synced 2025-07-10 18:41:30 +00:00

* update example script with logs dir, add screenshot timestamp * readme examples update * add flask app to view magentic_one * remove copy example * rename * changes to magentic one helper * update test web surfer to delete logs * magentic_one icons * fix colors - final log viewer * fix termination condition * update coder and log viewer * timeout time * make tests pass * logs dir * repeated thing * remove log_viewer, mm web surfer comments * coder change prompt, edit readmes * type ignore * remove logviewer * add flag for coder agent * readme * changes readme * uv lock * update readme figures * not yet * pointer images
41 lines
1.2 KiB
Python
41 lines
1.2 KiB
Python
from magentic_one_helper import MagenticOneHelper
|
|
import asyncio
|
|
import json
|
|
import argparse
|
|
import os
|
|
|
|
|
|
async def main(task, logs_dir):
|
|
magnetic_one = MagenticOneHelper(logs_dir=logs_dir)
|
|
await magnetic_one.initialize()
|
|
print("MagenticOne initialized.")
|
|
|
|
# Create task and log streaming tasks
|
|
task_future = asyncio.create_task(magnetic_one.run_task(task))
|
|
final_answer = None
|
|
|
|
# Stream and process logs
|
|
async for log_entry in magnetic_one.stream_logs():
|
|
print(json.dumps(log_entry, indent=2))
|
|
|
|
# Wait for task to complete
|
|
await task_future
|
|
|
|
# Get the final answer
|
|
final_answer = magnetic_one.get_final_answer()
|
|
|
|
if final_answer is not None:
|
|
print(f"Final answer: {final_answer}")
|
|
else:
|
|
print("No final answer found in logs.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run a task with MagenticOneHelper.")
|
|
parser.add_argument("task", type=str, help="The task to run")
|
|
parser.add_argument("--logs_dir", type=str, default="./logs", help="Directory to store logs")
|
|
args = parser.parse_args()
|
|
if not os.path.exists(args.logs_dir):
|
|
os.makedirs(args.logs_dir)
|
|
asyncio.run(main(args.task, args.logs_dir))
|