mirror of
				https://github.com/microsoft/autogen.git
				synced 2025-10-31 17:59:50 +00:00 
			
		
		
		
	 8603317537
			
		
	
	
		8603317537
		
			
		
	
	
	
	
		
			
			* update example script with logs dir, add screenshot timestamp * readme examples update * add flask app to view magentic_one * remove copy example * rename * changes to magentic one helper * update test web surfer to delete logs * magentic_one icons * fix colors - final log viewer * fix termination condition * update coder and log viewer * timeout time * make tests pass * logs dir * repeated thing * remove log_viewer, mm web surfer comments * coder change prompt, edit readmes * type ignore * remove logviewer * add flag for coder agent * readme * changes readme * uv lock * update readme figures * not yet * pointer images
		
			
				
	
	
		
			41 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			41 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from magentic_one_helper import MagenticOneHelper
 | |
| import asyncio
 | |
| import json
 | |
| import argparse
 | |
| import os
 | |
| 
 | |
| 
 | |
| async def main(task, logs_dir):
 | |
|     magnetic_one = MagenticOneHelper(logs_dir=logs_dir)
 | |
|     await magnetic_one.initialize()
 | |
|     print("MagenticOne initialized.")
 | |
| 
 | |
|     # Create task and log streaming tasks
 | |
|     task_future = asyncio.create_task(magnetic_one.run_task(task))
 | |
|     final_answer = None
 | |
| 
 | |
|     # Stream and process logs
 | |
|     async for log_entry in magnetic_one.stream_logs():
 | |
|         print(json.dumps(log_entry, indent=2))
 | |
| 
 | |
|     # Wait for task to complete
 | |
|     await task_future
 | |
| 
 | |
|     # Get the final answer
 | |
|     final_answer = magnetic_one.get_final_answer()
 | |
| 
 | |
|     if final_answer is not None:
 | |
|         print(f"Final answer: {final_answer}")
 | |
|     else:
 | |
|         print("No final answer found in logs.")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     parser = argparse.ArgumentParser(description="Run a task with MagenticOneHelper.")
 | |
|     parser.add_argument("task", type=str, help="The task to run")
 | |
|     parser.add_argument("--logs_dir", type=str, default="./logs", help="Directory to store logs")
 | |
|     args = parser.parse_args()
 | |
|     if not os.path.exists(args.logs_dir):
 | |
|         os.makedirs(args.logs_dir)
 | |
|     asyncio.run(main(args.task, args.logs_dir))
 |