diff --git a/pyproject.toml b/pyproject.toml index 115fd5b7d..3cd32f17a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ exclude = [ # This file needs to be either upgraded or removed and therefore should be # ignore from type checking for now "math_utils\\.py$", - "samples\\apps\\cap\\py\\autogencap\\proto\\.*\\.py", + "**/cap/py/autogencap/proto/*", ] ignore-init-module-imports = true unfixable = ["F401"] diff --git a/samples/apps/cap/py/autogencap/Actor.py b/samples/apps/cap/py/autogencap/Actor.py index 64c56bfcd..f757e4803 100644 --- a/samples/apps/cap/py/autogencap/Actor.py +++ b/samples/apps/cap/py/autogencap/Actor.py @@ -2,7 +2,7 @@ import zmq import threading import traceback import time -from .DebugLog import Debug, Info +from .DebugLog import Debug, Info, Error from .Config import xpub_url @@ -11,6 +11,7 @@ class Actor: self.actor_name: str = agent_name self.agent_description: str = description self.run = False + self._start_event = threading.Event() def connect_network(self, network): Debug(self.actor_name, f"is connecting to {network}") @@ -25,14 +26,15 @@ class Actor: return True def _recv_thread(self): - Debug(self.actor_name, "recv thread started") - self._socket: zmq.Socket = self._context.socket(zmq.SUB) - self._socket.setsockopt(zmq.RCVTIMEO, 500) - self._socket.connect(xpub_url) - str_topic = f"{self.actor_name}" - Debug(self.actor_name, f"subscribe to: {str_topic}") - self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}") try: + Debug(self.actor_name, "recv thread started") + self._socket: zmq.Socket = self._context.socket(zmq.SUB) + self._socket.setsockopt(zmq.RCVTIMEO, 500) + self._socket.connect(xpub_url) + str_topic = f"{self.actor_name}" + Debug(self.actor_name, f"subscribe to: {str_topic}") + self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}") + self._start_event.set() while self.run: try: topic, msg_type, sender_topic, msg = self._socket.recv_multipart() @@ -41,7 +43,9 @@ class Actor: sender_topic = sender_topic.decode("utf-8") # Convert bytes to string except zmq.Again: continue # No message received, continue to next iteration - except Exception: + except Exception as e: + Error(self.actor_name, f"recv thread encountered an error: {e}") + traceback.print_exc() continue if msg_type == "text": msg = msg.decode("utf-8") # Convert bytes to string @@ -57,6 +61,9 @@ class Actor: traceback.print_exc() finally: self.run = False + # In case there was an exception at startup signal + # the main thread. + self._start_event.set() Debug(self.actor_name, "recv thread ended") def start(self, context: zmq.Context): @@ -64,7 +71,7 @@ class Actor: self.run: bool = True self._thread = threading.Thread(target=self._recv_thread) self._thread.start() - time.sleep(0.01) + self._start_event.wait() def disconnect_network(self, network): Debug(self.actor_name, f"is disconnecting from {network}") diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index cbd55a1da..44c9bc447 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -5,25 +5,97 @@ import zmq from zmq.utils.monitor import recv_monitor_message import time import uuid -from .DebugLog import Debug, Error +from .DebugLog import Debug, Error, Info from .Config import xsub_url, xpub_url, router_url from typing import Any, Dict +class ActorSender: + def __init__(self, context, topic): + self._context = context + self._topic = topic + self._connect_pub_socket() + + def _connect_pub_socket(self): + Debug("ActorSender", f"Connecting pub socket {self._topic}") + self._pub_socket = self._context.socket(zmq.PUB) + monitor = self._pub_socket.get_monitor_socket() + self._pub_socket.setsockopt(zmq.LINGER, 0) + self._pub_socket.connect(xsub_url) + # Monitor handshake on the pub socket + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorSender", "Handshake received (Or Monitor stopped)") + break + self._pub_socket.disable_monitor() + monitor.close() + self._send_recv_router_msg() + + def _send_recv_router_msg(self): + # Send a request to the router and wait for a response + req_socket = self._context.socket(zmq.REQ) + req_socket.connect(router_url) + try: + Debug("ActorSender", "Broker Check Request Sent") + req_socket.send_string("Request") + _ = req_socket.recv_string() + Debug("ActorSender", "Broker Check Response Received") + finally: + req_socket.close() + + def send_txt_msg(self, msg): + Debug("ActorSender", f"[{self._topic}] send_txt_msg: {msg}") + self._pub_socket.send_multipart( + [self._topic.encode("utf8"), "text".encode("utf8"), "no_resp".encode("utf8"), msg.encode("utf8")] + ) + + def send_bin_msg(self, msg_type: str, msg): + Debug("ActorSender", f"[{self._topic}] send_bin_msg: {msg_type}") + self._pub_socket.send_multipart( + [self._topic.encode("utf8"), msg_type.encode("utf8"), "no_resp".encode("utf8"), msg] + ) + + def send_bin_request_msg(self, msg_type: str, msg, resp_topic: str): + Debug("ActorSender", f"[{self._topic}] send_bin_request_msg: {msg_type}") + self._pub_socket.send_multipart( + [self._topic.encode("utf8"), msg_type.encode("utf8"), resp_topic.encode("utf8"), msg] + ) + + def close(self): + self._pub_socket.close() + + class ActorConnector: def __init__(self, context, topic): self._context = context + self._topic = topic + self._connect_sub_socket() + self._sender = ActorSender(context, topic) + time.sleep(0.1) # Wait for the socket to connect + def _connect_sub_socket(self): self._resp_socket = self._context.socket(zmq.SUB) + monitor = self._resp_socket.get_monitor_socket() self._resp_socket.setsockopt(zmq.LINGER, 0) self._resp_socket.setsockopt(zmq.RCVTIMEO, 250) self._resp_socket.connect(xpub_url) self._resp_topic = str(uuid.uuid4()) - Debug("AgentConnector", f"subscribe to: {self._resp_topic}") + Debug("ActorConnector", f"subscribe to: {self._resp_topic}") self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}") - self._topic = topic - - self._connect_pub_socket() + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + Debug("ActorConnector", evt) + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorConnector", "Handshake received (Or Monitor stopped)") + break + self._resp_socket.disable_monitor() + monitor.close() + self._send_recv_router_msg() def _send_recv_router_msg(self): # Send a request to the router and wait for a response @@ -37,48 +109,38 @@ class ActorConnector: finally: req_socket.close() - def _connect_pub_socket(self): - self._pub_socket = self._context.socket(zmq.PUB) - self._pub_socket.setsockopt(zmq.LINGER, 0) - monitor = self._pub_socket.get_monitor_socket() - self._pub_socket.connect(xsub_url) - # Monitor handshake on the pub socket - while monitor.poll(): - evt: Dict[str, Any] = {} - mon_evt = recv_monitor_message(monitor) - evt.update(mon_evt) - if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: - Debug("ActorConnector", "Handshake received (Or Monitor stopped)") - break - self._pub_socket.disable_monitor() - monitor.close() - self._send_recv_router_msg() - def send_txt_msg(self, msg): - self._pub_socket.send_multipart( - [self._topic.encode("utf8"), "text".encode("utf8"), self._resp_topic.encode("utf8"), msg.encode("utf8")] - ) + self._sender.send_txt_msg(msg) def send_bin_msg(self, msg_type: str, msg): - self._pub_socket.send_multipart( - [self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg] - ) + self._sender.send_bin_msg(msg_type, msg) def binary_request(self, msg_type: str, msg, retry=5): - self._pub_socket.send_multipart( - [self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg] - ) - for i in range(retry + 1): - try: - resp_topic, resp_msg_type, resp_sender_topic, resp = self._resp_socket.recv_multipart() - return resp_topic, resp_msg_type, resp_sender_topic, resp - except zmq.Again: - Debug("ActorConnector", f"binary_request: No response received. retry_count={i}, max_retry={retry}") - time.sleep(0.01) # Wait a bit before retrying - continue - Error("ActorConnector", "binary_request: No response received. Giving up.") - return None, None, None, None + original_timeout: int = 0 + if retry == -1: + original_timeout = self._resp_socket.getsockopt(zmq.RCVTIMEO) + self._resp_socket.setsockopt(zmq.RCVTIMEO, 1000) + + try: + self._sender.send_bin_request_msg(msg_type, msg, self._resp_topic) + while retry == -1 or retry > 0: + try: + topic, resp_msg_type, _, resp = self._resp_socket.recv_multipart() + return topic, resp_msg_type, resp + except zmq.Again: + Debug( + "ActorConnector", f"{self._topic}: No response received. retry_count={retry}, max_retry={retry}" + ) + time.sleep(0.01) + if retry != -1: + retry -= 1 + finally: + if retry == -1: + self._resp_socket.setsockopt(zmq.RCVTIMEO, original_timeout) + + Error("ActorConnector", f"{self._topic}: No response received. Giving up.") + return None, None, None def close(self): - self._pub_socket.close() + self._sender.close() self._resp_socket.close() diff --git a/samples/apps/cap/py/autogencap/Broker.py b/samples/apps/cap/py/autogencap/Broker.py index 8f2765193..03e101e02 100644 --- a/samples/apps/cap/py/autogencap/Broker.py +++ b/samples/apps/cap/py/autogencap/Broker.py @@ -12,6 +12,7 @@ class Broker: self._xpub: zmq.Socket = None self._xsub: zmq.Socket = None self._router: zmq.Socket = None + self._start_event = threading.Event() def _init_sockets(self): try: @@ -44,8 +45,9 @@ class Broker: self._run = True self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) self._broker_thread.start() - time.sleep(0.01) - return True + self._start_event.wait() + # this will be false if the thread is not running + return self._run def stop(self): if not self._run: @@ -67,6 +69,7 @@ class Broker: if not self._init_sockets(): Debug("BROKER", "Receive thread not started since sockets were not initialized") self._run = False + self._start_event.set() return # Poll sockets for events @@ -76,6 +79,8 @@ class Broker: self._poller.register(self._router, zmq.POLLIN) Info("BROKER", "Started. Waiting for events") + # signal to the main thread that Broker has started + self._start_event.set() # Receive msgs, forward and process while self._run: events = dict(self._poller.poll(500)) @@ -131,6 +136,8 @@ def main(): Info("BROKER", "Running.") last_time = current_time try: + # Hang out for a while and print out + # status every now and then time.sleep(0.5) except KeyboardInterrupt: Info("BROKER", "KeyboardInterrupt. Stopping the broker.") diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index 87833c91a..bde156062 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -1,7 +1,7 @@ from autogencap.Constants import Directory_Svc_Topic from autogencap.Config import xpub_url, xsub_url, router_url from autogencap.DebugLog import Debug, Info, Error -from autogencap.ActorConnector import ActorConnector +from autogencap.ActorConnector import ActorConnector, ActorSender from autogencap.Actor import Actor from autogencap.Broker import Broker from autogencap.proto.CAP_pb2 import ( @@ -12,7 +12,11 @@ from autogencap.proto.CAP_pb2 import ( Ping, Pong, ActorInfoCollection, + Error as ErrorMsg, + ErrorCode, ) +from autogencap.utility import report_error_msg + import zmq import threading import time @@ -30,7 +34,7 @@ class DirectoryActor(Actor): def _process_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str) -> bool: if msg_type == ActorRegistration.__name__: - self._actor_registration_msg_handler(topic, msg_type, msg) + self._actor_registration_msg_handler(topic, msg_type, msg, sender) elif msg_type == ActorLookup.__name__: self._actor_lookup_msg_handler(topic, msg_type, msg, sender) elif msg_type == Ping.__name__: @@ -43,20 +47,26 @@ class DirectoryActor(Actor): Info("DirectorySvc", f"Ping received: {sender_topic}") pong = Pong() serialized_msg = pong.SerializeToString() - sender_connection = ActorConnector(self._context, sender_topic) + sender_connection = ActorSender(self._context, sender_topic) sender_connection.send_bin_msg(Pong.__name__, serialized_msg) - def _actor_registration_msg_handler(self, topic: str, msg_type: str, msg: bytes): + def _actor_registration_msg_handler(self, topic: str, msg_type: str, msg: bytes, sender_topic: str): actor_reg = ActorRegistration() actor_reg.ParseFromString(msg) Info("DirectorySvc", f"Actor registration: {actor_reg.actor_info.name}") name = actor_reg.actor_info.name # TODO (Future DirectorySv PR) network_id should be namespace prefixed to support multiple networks actor_reg.actor_info.name + self._network_prefix + err = ErrorMsg() if name in self._registered_actors: Error("DirectorySvc", f"Actor already registered: {name}") - return - self._registered_actors[name] = actor_reg.actor_info + err.code = ErrorCode.EC_ALREADY_EXISTS + else: + self._registered_actors[name] = actor_reg.actor_info + + sender_connection = ActorSender(self._context, sender_topic) + serialized_msg = err.SerializeToString() + sender_connection.send_bin_msg(ErrorMsg.__name__, serialized_msg) def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sender_topic: str): actor_lookup = ActorLookup() @@ -83,7 +93,7 @@ class DirectoryActor(Actor): else: Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}") - sender_connection = ActorConnector(self._context, sender_topic) + sender_connection = ActorSender(self._context, sender_topic) serialized_msg = actor_lookup_resp.SerializeToString() sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg) @@ -98,12 +108,13 @@ class DirectorySvc: Debug("DirectorySvc", "Pinging existing DirectorySvc") ping = Ping() serialized_msg = ping.SerializeToString() - _, _, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=0) + _, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=0) if resp is None: return True return False def start(self): + Debug("DirectorySvc", "Starting.") self._directory_connector = ActorConnector(self._context, Directory_Svc_Topic) if self._no_other_directory(): self._directory_actor = DirectoryActor(Directory_Svc_Topic, "Directory Service") @@ -124,7 +135,8 @@ class DirectorySvc: actor_reg = ActorRegistration() actor_reg.actor_info.CopyFrom(actor_info) serialized_msg = actor_reg.SerializeToString() - self._directory_connector.send_bin_msg(ActorRegistration.__name__, serialized_msg) + _, _, resp = self._directory_connector.binary_request(ActorRegistration.__name__, serialized_msg) + report_error_msg(resp, "DirectorySvc") def register_actor_by_name(self, actor_name: str): actor_info = ActorInfo(name=actor_name) @@ -134,7 +146,7 @@ class DirectorySvc: actor_info = ActorInfo(name=name_regex) actor_lookup = ActorLookup(actor_info=actor_info) serialized_msg = actor_lookup.SerializeToString() - _, _, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg) + _, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg) actor_lookup_resp = ActorLookupResponse() actor_lookup_resp.ParseFromString(resp) return actor_lookup_resp @@ -185,6 +197,8 @@ def main(): Info("DirectorySvc", "Running.") last_time = current_time try: + # Hang out for a while and print out + # status every now and then time.sleep(0.5) except KeyboardInterrupt: Info("DirectorySvc", "KeyboardInterrupt. Stopping the DirectorySvc.") diff --git a/samples/apps/cap/py/autogencap/LocalActorNetwork.py b/samples/apps/cap/py/autogencap/LocalActorNetwork.py index 685972119..907e852d3 100644 --- a/samples/apps/cap/py/autogencap/LocalActorNetwork.py +++ b/samples/apps/cap/py/autogencap/LocalActorNetwork.py @@ -33,6 +33,7 @@ class LocalActorNetwork: if self._directory_svc is None: self._directory_svc = DirectorySvc(self._context) self._directory_svc.start() + time.sleep(0.25) # Process queued thread events in Broker and Directory def register(self, actor: Actor): self._init_runtime() diff --git a/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py b/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py index d126b067c..ee94cdabb 100644 --- a/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py +++ b/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py @@ -32,7 +32,10 @@ class AutoGenConnector: """ msg = GenReplyReq() serialized_msg = msg.SerializeToString() - _, _, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg) + # Setting retry to -1 to keep trying until a response is received + # This normal AutoGen behavior but does not handle the case when an AutoGen agent + # is not running. In that case, the connector will keep trying indefinitely. + _, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, retry=-1) gen_reply_resp = GenReplyResp() gen_reply_resp.ParseFromString(resp) return gen_reply_resp.data diff --git a/samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChatManager.py b/samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChatManager.py index fd858cd3f..e7fa2804b 100644 --- a/samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChatManager.py +++ b/samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChatManager.py @@ -33,6 +33,8 @@ class CAPGroupChatManager: def _wait_for_user_exit(self) -> None: try: while self.is_running(): + # Hang out for a while and print out + # status every now and then time.sleep(0.5) except KeyboardInterrupt: print("Interrupted by user, shutting down.") diff --git a/samples/apps/cap/py/autogencap/proto/Autogen_pb2.py b/samples/apps/cap/py/autogencap/proto/Autogen_pb2.py index 400886a08..5172d2432 100644 --- a/samples/apps/cap/py/autogencap/proto/Autogen_pb2.py +++ b/samples/apps/cap/py/autogencap/proto/Autogen_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "Autogen_pb2", _globals) -if _descriptor._USE_C_DESCRIPTORS is False: +if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals["_DATAMAP_DATAENTRY"]._options = None _globals["_DATAMAP_DATAENTRY"]._serialized_options = b"8\001" diff --git a/samples/apps/cap/py/autogencap/proto/CAP.proto b/samples/apps/cap/py/autogencap/proto/CAP.proto index ee857a2f2..5d9496e92 100644 --- a/samples/apps/cap/py/autogencap/proto/CAP.proto +++ b/samples/apps/cap/py/autogencap/proto/CAP.proto @@ -3,6 +3,19 @@ syntax = "proto3"; // Get protoc here https://github.com/protocolbuffers/protobuf/releases // .\protoc --python_out=. --pyi_out=. CAP.proto +enum ErrorCode { + option allow_alias = true; + EC_OK = 0; + EC_NOT_FOUND = 1; + EC_ALREADY_EXISTS = 2; + EC_MAX = 2; // IMPORTANT: Update this if you add more error codes +} + +message Error { + ErrorCode code = 1; + optional string message = 2; +} + message ActorInfo { string name = 1; optional string namespace = 2; diff --git a/samples/apps/cap/py/autogencap/proto/CAP_pb2.py b/samples/apps/cap/py/autogencap/proto/CAP_pb2.py index 35fd1f280..59662102d 100644 --- a/samples/apps/cap/py/autogencap/proto/CAP_pb2.py +++ b/samples/apps/cap/py/autogencap/proto/CAP_pb2.py @@ -14,26 +14,32 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\tCAP.proto"i\n\tActorInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x16\n\tnamespace\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0b\x64\x65scription\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\x0c\n\n_namespaceB\x0e\n\x0c_description"3\n\x11\x41\x63torRegistration\x12\x1e\n\nactor_info\x18\x01 \x01(\x0b\x32\n.ActorInfo"A\n\x0b\x41\x63torLookup\x12#\n\nactor_info\x18\x01 \x01(\x0b\x32\n.ActorInfoH\x00\x88\x01\x01\x42\r\n\x0b_actor_info"4\n\x13\x41\x63torInfoCollection\x12\x1d\n\tinfo_coll\x18\x01 \x03(\x0b\x32\n.ActorInfo"X\n\x13\x41\x63torLookupResponse\x12\r\n\x05\x66ound\x18\x01 \x01(\x08\x12(\n\x05\x61\x63tor\x18\x02 \x01(\x0b\x32\x14.ActorInfoCollectionH\x00\x88\x01\x01\x42\x08\n\x06_actor"\x06\n\x04Ping"\x06\n\x04Pongb\x06proto3' + b'\n\tCAP.proto"C\n\x05\x45rror\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x14\n\x07message\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\n\n\x08_message"i\n\tActorInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x16\n\tnamespace\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0b\x64\x65scription\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\x0c\n\n_namespaceB\x0e\n\x0c_description"3\n\x11\x41\x63torRegistration\x12\x1e\n\nactor_info\x18\x01 \x01(\x0b\x32\n.ActorInfo"A\n\x0b\x41\x63torLookup\x12#\n\nactor_info\x18\x01 \x01(\x0b\x32\n.ActorInfoH\x00\x88\x01\x01\x42\r\n\x0b_actor_info"4\n\x13\x41\x63torInfoCollection\x12\x1d\n\tinfo_coll\x18\x01 \x03(\x0b\x32\n.ActorInfo"X\n\x13\x41\x63torLookupResponse\x12\r\n\x05\x66ound\x18\x01 \x01(\x08\x12(\n\x05\x61\x63tor\x18\x02 \x01(\x0b\x32\x14.ActorInfoCollectionH\x00\x88\x01\x01\x42\x08\n\x06_actor"\x06\n\x04Ping"\x06\n\x04Pong*O\n\tErrorCode\x12\t\n\x05\x45\x43_OK\x10\x00\x12\x10\n\x0c\x45\x43_NOT_FOUND\x10\x01\x12\x15\n\x11\x45\x43_ALREADY_EXISTS\x10\x02\x12\n\n\x06\x45\x43_MAX\x10\x02\x1a\x02\x10\x01\x62\x06proto3' ) _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "CAP_pb2", _globals) -if _descriptor._USE_C_DESCRIPTORS is False: +if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals["_ACTORINFO"]._serialized_start = 13 - _globals["_ACTORINFO"]._serialized_end = 118 - _globals["_ACTORREGISTRATION"]._serialized_start = 120 - _globals["_ACTORREGISTRATION"]._serialized_end = 171 - _globals["_ACTORLOOKUP"]._serialized_start = 173 - _globals["_ACTORLOOKUP"]._serialized_end = 238 - _globals["_ACTORINFOCOLLECTION"]._serialized_start = 240 - _globals["_ACTORINFOCOLLECTION"]._serialized_end = 292 - _globals["_ACTORLOOKUPRESPONSE"]._serialized_start = 294 - _globals["_ACTORLOOKUPRESPONSE"]._serialized_end = 382 - _globals["_PING"]._serialized_start = 384 - _globals["_PING"]._serialized_end = 390 - _globals["_PONG"]._serialized_start = 392 - _globals["_PONG"]._serialized_end = 398 + _globals["_ERRORCODE"]._options = None + _globals["_ERRORCODE"]._serialized_options = b"\020\001" + _globals["_ERRORCODE"]._serialized_start = 469 + _globals["_ERRORCODE"]._serialized_end = 548 + _globals["_ERROR"]._serialized_start = 13 + _globals["_ERROR"]._serialized_end = 80 + _globals["_ACTORINFO"]._serialized_start = 82 + _globals["_ACTORINFO"]._serialized_end = 187 + _globals["_ACTORREGISTRATION"]._serialized_start = 189 + _globals["_ACTORREGISTRATION"]._serialized_end = 240 + _globals["_ACTORLOOKUP"]._serialized_start = 242 + _globals["_ACTORLOOKUP"]._serialized_end = 307 + _globals["_ACTORINFOCOLLECTION"]._serialized_start = 309 + _globals["_ACTORINFOCOLLECTION"]._serialized_end = 361 + _globals["_ACTORLOOKUPRESPONSE"]._serialized_start = 363 + _globals["_ACTORLOOKUPRESPONSE"]._serialized_end = 451 + _globals["_PING"]._serialized_start = 453 + _globals["_PING"]._serialized_end = 459 + _globals["_PONG"]._serialized_start = 461 + _globals["_PONG"]._serialized_end = 467 # @@protoc_insertion_point(module_scope) diff --git a/samples/apps/cap/py/autogencap/proto/CAP_pb2.pyi b/samples/apps/cap/py/autogencap/proto/CAP_pb2.pyi index 77f5e7974..e4eabc83e 100644 --- a/samples/apps/cap/py/autogencap/proto/CAP_pb2.pyi +++ b/samples/apps/cap/py/autogencap/proto/CAP_pb2.pyi @@ -1,4 +1,5 @@ from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from typing import ( @@ -11,6 +12,26 @@ from typing import ( DESCRIPTOR: _descriptor.FileDescriptor +class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + EC_OK: _ClassVar[ErrorCode] + EC_NOT_FOUND: _ClassVar[ErrorCode] + EC_ALREADY_EXISTS: _ClassVar[ErrorCode] + EC_MAX: _ClassVar[ErrorCode] + +EC_OK: ErrorCode +EC_NOT_FOUND: ErrorCode +EC_ALREADY_EXISTS: ErrorCode +EC_MAX: ErrorCode + +class Error(_message.Message): + __slots__ = ("code", "message") + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: ErrorCode + message: str + def __init__(self, code: _Optional[_Union[ErrorCode, str]] = ..., message: _Optional[str] = ...) -> None: ... + class ActorInfo(_message.Message): __slots__ = ("name", "namespace", "description") NAME_FIELD_NUMBER: _ClassVar[int] diff --git a/samples/apps/cap/py/autogencap/utility.py b/samples/apps/cap/py/autogencap/utility.py new file mode 100644 index 000000000..ca45690f2 --- /dev/null +++ b/samples/apps/cap/py/autogencap/utility.py @@ -0,0 +1,10 @@ +from autogencap.DebugLog import Error +from autogencap.proto.CAP_pb2 import Error as ErrorMsg, ErrorCode + + +def report_error_msg(msg: ErrorMsg, src: str): + if msg is not None: + err = ErrorMsg() + err.ParseFromString(msg) + if err.code != ErrorCode.EC_OK: + Error(src, f"Error response: code[{err.code}] msg[{err.message}]") diff --git a/samples/apps/cap/py/demo/CAPAutoGenPairDemo.py b/samples/apps/cap/py/demo/CAPAutoGenPairDemo.py index b20e94038..76a7d67bc 100644 --- a/samples/apps/cap/py/demo/CAPAutoGenPairDemo.py +++ b/samples/apps/cap/py/demo/CAPAutoGenPairDemo.py @@ -23,6 +23,8 @@ def cap_ag_pair_demo(): # Wait for the pair to finish try: while pair.running(): + # Hang out for a while and print out + # status every now and then time.sleep(0.5) except KeyboardInterrupt: print("Interrupted by user, shutting down.") diff --git a/samples/apps/cap/py/demo/SimpleActorDemo.py b/samples/apps/cap/py/demo/SimpleActorDemo.py index a7031a4bb..a7662074d 100644 --- a/samples/apps/cap/py/demo/SimpleActorDemo.py +++ b/samples/apps/cap/py/demo/SimpleActorDemo.py @@ -1,6 +1,7 @@ import time from AppAgents import GreeterAgent from autogencap.LocalActorNetwork import LocalActorNetwork +from autogencap.DebugLog import Error def simple_actor_demo(): @@ -17,8 +18,11 @@ def simple_actor_demo(): network.connect() # Get a channel to the actor greeter_link = network.lookup_actor("Greeter") - # Send a message to the actor - greeter_link.send_txt_msg("Hello World!") - # Cleanup - greeter_link.close() + if greeter_link: + # Send a message to the actor + greeter_link.send_txt_msg("Hello World!") + # Cleanup + greeter_link.close() + else: + Error("simple_actor_demo", "Could not find Greeter") network.disconnect() diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py index 2535678e0..f96045896 100644 --- a/samples/apps/cap/py/demo/list_agents.py +++ b/samples/apps/cap/py/demo/list_agents.py @@ -3,6 +3,7 @@ from typing import List from AppAgents import GreeterAgent, FidelityAgent from autogencap.LocalActorNetwork import LocalActorNetwork from autogencap.proto.CAP_pb2 import ActorInfo +from autogencap.DebugLog import Info def list_agents(): @@ -22,8 +23,11 @@ def list_agents(): # Get a list of actors actor_infos: List[ActorInfo] = network.lookup_actor_info(name_regex=".*") # Print out all actors found + Info("list_agents", f"{len(actor_infos)} actors found:") for actor_info in actor_infos: - print(f"Name: {actor_info.name}, Namespace: {actor_info.namespace}, Description: {actor_info.description}") - time.sleep(1) + Info( + "list_agents", + f"Name: {actor_info.name}, Namespace: {actor_info.namespace}, Description: {actor_info.description}", + ) # Cleanup network.disconnect()