fix: gRPC Agent Runtime Serialization Registration (#5513)

We were registering the serializers when we already had a concrete type
on the way into Publish or Send on the .NET side. However, in a xLang
scenario, messages could originate from e.g. Python before being
sent/published from .NET, resulting in no serializer being found.

This change adds a second-change registration and lookup when the agent
is instantiated based on the IHandle<T> implementations.
This commit is contained in:
Jacob Alber 2025-02-12 12:12:07 -05:00 committed by GitHub
parent 492b106b19
commit 07fdc4e2da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 5 deletions

View File

@ -169,7 +169,7 @@ jobs:
dotnet-version: '9.0.x'
- name: Install Temp Global.JSON
run: |
echo "{\"sdk\": {\"version\": \"9.0.101\"}}" > global.json
echo "{\"sdk\": {\"version\": \"9.0\"}}" > global.json
- name: Install .NET Aspire workload
run: dotnet workload install aspire
- name: Install dev certs

View File

@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentExtensions.cs
using System.Reflection;
using Google.Protobuf;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core.Grpc;
namespace Microsoft.AutoGen.Core;
internal static partial class AgentExtensions
{
private static readonly Type ProtobufIMessage = typeof(IMessage<>);
private static bool IsProtobufType(this Type type)
{
// TODO: Support the non-generic IMessage as well
Type specializedIMessageType = ProtobufIMessage.MakeGenericType(type);
// type T needs to derive from IMessage<T>
return specializedIMessageType.IsAssignableFrom(type);
}
public static void RegisterHandledMessageTypes(this IHostableAgent agent, IProtoSerializationRegistry registry)
{
Type agentRuntimeType = agent.GetType();
MethodInfo[] messageHandlers = agentRuntimeType.GetHandlers();
foreach (MethodInfo handler in messageHandlers)
{
Type messageType = handler.GetParameters().First().ParameterType;
if (messageType.IsProtobufType() && registry.GetSerializer(messageType) == null)
{
registry.RegisterSerializer(messageType);
}
}
}
}

View File

@ -11,9 +11,10 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Core.Grpc;
internal sealed class AgentsContainer(IAgentRuntime hostingRuntime)
internal sealed class AgentsContainer(IAgentRuntime hostingRuntime, IProtoSerializationRegistry serializationRegistry)
{
private readonly IAgentRuntime hostingRuntime = hostingRuntime;
private readonly IProtoSerializationRegistry serializationRegistry = serializationRegistry;
private Dictionary<Contracts.AgentId, IHostableAgent> agentInstances = new();
public Dictionary<string, ISubscriptionDefinition> Subscriptions = new();
@ -29,6 +30,10 @@ internal sealed class AgentsContainer(IAgentRuntime hostingRuntime)
}
agent = await factoryFunc(agentId, this.hostingRuntime);
// Just-in-Time register the message types so we can deserialize them
agent.RegisterHandledMessageTypes(this.serializationRegistry);
this.agentInstances.Add(agentId, agent);
}
@ -92,7 +97,7 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi
this._shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping);
this._messageRouter = new GrpcMessageRouter(client, this, _clientId, logger, this._shutdownCts.Token);
this._agentsContainer = new AgentsContainer(this);
this._agentsContainer = new AgentsContainer(this, this.SerializationRegistry);
this.ServiceProvider = serviceProvider;
}
@ -231,8 +236,6 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi
var messageId = evt.Id;
var typeName = evt.Attributes[Constants.DATA_SCHEMA_ATTR].CeString;
var serializer = SerializationRegistry.GetSerializer(typeName) ?? throw new Exception();
var message = serializer.Deserialize(evt.ProtoData);
var messageContext = new MessageContext(messageId, cancellationToken)
{
@ -241,6 +244,10 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi
IsRpc = false
};
// We may not have a Serializer registered yet, if this is the first time we are instantiating the agent
IProtobufMessageSerializer? serializer = SerializationRegistry.GetSerializer(typeName);
object? message = serializer?.Deserialize(evt.ProtoData);
// Iterate over subscriptions values to find receiving agents
foreach (var subscription in this._agentsContainer.Subscriptions.Values)
{
@ -248,6 +255,11 @@ public sealed class GrpcAgentRuntime : IHostedService, IAgentRuntime, IMessageSi
{
var recipient = subscription.MapToAgent(topic);
var agent = await this._agentsContainer.EnsureAgentAsync(recipient);
// give the serializer a second chance to have been registered
serializer ??= SerializationRegistry.GetSerializer(typeName) ?? throw new Exception($"Could not find a serializer for message of type {typeName}");
message ??= serializer.Deserialize(evt.ProtoData);
await agent.OnMessageAsync(message, messageContext);
}
}