autogen/python/packages/autogen-magentic-one/interface/example_magentic_one_helper.py
Hussein Mozannar 8603317537
Magentic-One Log Viewer + preview API (#4032)
* update example script with logs dir, add screenshot timestamp

* readme examples update

* add flask app to view magentic_one

* remove copy example

* rename

* changes to magentic one helper

* update test web surfer to delete logs

* magentic_one icons

* fix colors - final log viewer

* fix termination condition

* update coder and log viewer

* timeout time

* make tests pass

* logs dir

* repeated thing

* remove log_viewer, mm web surfer comments

* coder change prompt, edit readmes

* type ignore

* remove logviewer

* add flag for coder agent

* readme

* changes readme

* uv lock

* update readme figures

* not yet

* pointer images
2024-11-04 17:18:46 -08:00

41 lines
1.2 KiB
Python

from magentic_one_helper import MagenticOneHelper
import asyncio
import json
import argparse
import os
async def main(task, logs_dir):
magnetic_one = MagenticOneHelper(logs_dir=logs_dir)
await magnetic_one.initialize()
print("MagenticOne initialized.")
# Create task and log streaming tasks
task_future = asyncio.create_task(magnetic_one.run_task(task))
final_answer = None
# Stream and process logs
async for log_entry in magnetic_one.stream_logs():
print(json.dumps(log_entry, indent=2))
# Wait for task to complete
await task_future
# Get the final answer
final_answer = magnetic_one.get_final_answer()
if final_answer is not None:
print(f"Final answer: {final_answer}")
else:
print("No final answer found in logs.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a task with MagenticOneHelper.")
parser.add_argument("task", type=str, help="The task to run")
parser.add_argument("--logs_dir", type=str, default="./logs", help="Directory to store logs")
args = parser.parse_args()
if not os.path.exists(args.logs_dir):
os.makedirs(args.logs_dir)
asyncio.run(main(args.task, args.logs_dir))