autogen/test/io/test_base.py
Davor Runje 78aa0eb220
Introducing IOStream protocol and adding support for websockets (#1551)
* Introducing IOStream

* bug fixing

* polishing

* refactoring

* refactoring

* refactoring

* wip: async tests

* websockets added

* wip

* merge with main

* notebook added

* FastAPI example added

* wip

* merge

* getter/setter to iostream added

* website/blog/2024-03-03-AutoGen-Update/img/dalle_gpt4v.png: convert to Git LFS

* website/blog/2024-03-03-AutoGen-Update/img/gaia.png: convert to Git LFS

* website/blog/2024-03-03-AutoGen-Update/img/teach.png: convert to Git LFS

* add SSL support

* wip

* wip

* exception handling added to on_connect()

* refactoring: default iostream is being set in a context manager

* test fix

* polishing

* polishing

* polishing

* fixed bug with new thread

* polishing

* a bit of refactoring and docs added

* notebook added to docs

* type checking added to CI

* CI fix

* CI fix

* CI fix

* polishing

* obsolete todo comment removed

* fixed precommit error

---------

Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
2024-03-26 22:39:55 +00:00

29 lines
982 B
Python

from typing import Any
from autogen.io import IOConsole, IOStream, IOWebsockets
class TestIOStream:
def test_initial_default_io_stream(self) -> None:
assert isinstance(IOStream.get_default(), IOConsole)
def test_set_default_io_stream(self) -> None:
class MyIOStream(IOStream):
def print(self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None:
pass
def input(self, prompt: str = "", *, password: bool = False) -> str:
return "Hello, World!"
assert isinstance(IOStream.get_default(), IOConsole)
with IOStream.set_default(MyIOStream()):
assert isinstance(IOStream.get_default(), MyIOStream)
with IOStream.set_default(IOConsole()):
assert isinstance(IOStream.get_default(), IOConsole)
assert isinstance(IOStream.get_default(), MyIOStream)
assert isinstance(IOStream.get_default(), IOConsole)