From 07fdc4e2da72fa5b00f111bb9d2cf522b7adca8f Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Wed, 12 Feb 2025 12:12:07 -0500 Subject: [PATCH] fix: gRPC Agent Runtime Serialization Registration (#5513) We were registering the serializers when we already had a concrete type on the way into Publish or Send on the .NET side. However, in a xLang scenario, messages could originate from e.g. Python before being sent/published from .NET, resulting in no serializer being found. This change adds a second-change registration and lookup when the agent is instantiated based on the IHandle implementations. --- .github/workflows/dotnet-build.yml | 2 +- .../Core.Grpc/AgentExtensions.cs | 38 +++++++++++++++++++ .../Core.Grpc/GrpcAgentRuntime.cs | 20 ++++++++-- 3 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs diff --git a/.github/workflows/dotnet-build.yml b/.github/workflows/dotnet-build.yml index ca55a6f14..a485f9ddd 100644 --- a/.github/workflows/dotnet-build.yml +++ b/.github/workflows/dotnet-build.yml @@ -169,7 +169,7 @@ jobs: dotnet-version: '9.0.x' - name: Install Temp Global.JSON run: | - echo "{\"sdk\": {\"version\": \"9.0.101\"}}" > global.json + echo "{\"sdk\": {\"version\": \"9.0\"}}" > global.json - name: Install .NET Aspire workload run: dotnet workload install aspire - name: Install dev certs diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs new file mode 100644 index 000000000..816f2625c --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentExtensions.cs + +using System.Reflection; +using Google.Protobuf; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core.Grpc; + +namespace Microsoft.AutoGen.Core; + +internal static partial class AgentExtensions +{ + private static readonly Type ProtobufIMessage = typeof(IMessage<>); + private static bool IsProtobufType(this Type type) + { + // TODO: Support the non-generic IMessage as well + Type specializedIMessageType = ProtobufIMessage.MakeGenericType(type); + + // type T needs to derive from IMessage + return specializedIMessageType.IsAssignableFrom(type); + } + + public static void RegisterHandledMessageTypes(this IHostableAgent agent, IProtoSerializationRegistry registry) + { + Type agentRuntimeType = agent.GetType(); + + MethodInfo[] messageHandlers = agentRuntimeType.GetHandlers(); + + foreach (MethodInfo handler in messageHandlers) + { + Type messageType = handler.GetParameters().First().ParameterType; + if (messageType.IsProtobufType() && registry.GetSerializer(messageType) == null) + { + registry.RegisterSerializer(messageType); + } + } + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs index 83c77c5ef..c3fce75b7 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs @@ -11,9 +11,10 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Core.Grpc; -internal sealed class AgentsContainer(IAgentRuntime hostingRuntime) +internal sealed class AgentsContainer(IAgentRuntime hostingRuntime, IProtoSerializationRegistry serializationRegistry) { private readonly IAgentRuntime hostingRuntime = hostingRuntime; + private readonly IProtoSerializationRegistry serializationRegistry = serializationRegistry; private Dictionary agentInstances = new(); public Dictionary Subscriptions = new(); @@ -29,6 +30,10 @@ internal sealed class AgentsContainer(IAgentRuntime hostingRuntime) } agent = await factoryFunc(agentId, this.hostingRuntime); + + // Just-in-Time register the message types so we can deserialize them + agent.RegisterHandledMessageTypes(this.serializationRegistry); + this.agentInstances.Add(agentId, agent); } @@ -92,7 +97,7 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi this._shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); this._messageRouter = new GrpcMessageRouter(client, this, _clientId, logger, this._shutdownCts.Token); - this._agentsContainer = new AgentsContainer(this); + this._agentsContainer = new AgentsContainer(this, this.SerializationRegistry); this.ServiceProvider = serviceProvider; } @@ -231,8 +236,6 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi var messageId = evt.Id; var typeName = evt.Attributes[Constants.DATA_SCHEMA_ATTR].CeString; - var serializer = SerializationRegistry.GetSerializer(typeName) ?? throw new Exception(); - var message = serializer.Deserialize(evt.ProtoData); var messageContext = new MessageContext(messageId, cancellationToken) { @@ -241,6 +244,10 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi IsRpc = false }; + // We may not have a Serializer registered yet, if this is the first time we are instantiating the agent + IProtobufMessageSerializer? serializer = SerializationRegistry.GetSerializer(typeName); + object? message = serializer?.Deserialize(evt.ProtoData); + // Iterate over subscriptions values to find receiving agents foreach (var subscription in this._agentsContainer.Subscriptions.Values) { @@ -248,6 +255,11 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi { var recipient = subscription.MapToAgent(topic); var agent = await this._agentsContainer.EnsureAgentAsync(recipient); + + // give the serializer a second chance to have been registered + serializer ??= SerializationRegistry.GetSerializer(typeName) ?? throw new Exception($"Could not find a serializer for message of type {typeName}"); + message ??= serializer.Deserialize(evt.ProtoData); + await agent.OnMessageAsync(message, messageContext); } }