// Copyright (c) Microsoft Corporation. All rights reserved. // AgentGrpcTests.cs using System.Collections.Concurrent; using System.Text.Json; using FluentAssertions; using Google.Protobuf.Reflection; using Microsoft.AutoGen.Contracts; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Xunit; using static Microsoft.AutoGen.Core.Grpc.Tests.AgentGrpcTests; namespace Microsoft.AutoGen.Core.Grpc.Tests; public class AgentGrpcTests { /// /// Verify that if the agent is not initialized via AgentWorker, it should throw the correct exception. /// /// void [Fact] public async Task Agent_ShouldThrowException_WhenNotInitialized() { using var runtime = new GrpcRuntime(); var (_, agent) = runtime.Start(false); // Do not initialize // Expect an exception when calling SubscribeAsync because the agent is uninitialized await Assert.ThrowsAsync( async () => await agent.SubscribeAsync("TestEvent") ); } /// /// validate that the agent is initialized correctly with implicit subs /// /// void [Fact] public async Task Agent_ShouldInitializeCorrectly() { using var runtime = new GrpcRuntime(); var (worker, agent) = runtime.Start(); Assert.Equal("GrpcAgentWorker", worker.GetType().Name); await Task.Delay(5000); var subscriptions = await agent.GetSubscriptionsAsync(); Assert.Equal(2, subscriptions.Count); } /// /// Test SubscribeAsync method /// /// void [Fact] public async Task SubscribeAsync_UnsubscribeAsync_and_GetSubscriptionsTest() { using var runtime = new GrpcRuntime(); var (_, agent) = runtime.Start(); await agent.SubscribeAsync("TestEvent"); await Task.Delay(100); var subscriptions = await agent.GetSubscriptionsAsync().ConfigureAwait(true); var found = false; foreach (var subscription in subscriptions) { if (subscription.TypeSubscription.TopicType == "TestEvent") { found = true; } } Assert.True(found); await agent.UnsubscribeAsync("TestEvent").ConfigureAwait(true); await Task.Delay(1000); subscriptions = await agent.GetSubscriptionsAsync().ConfigureAwait(true); found = false; foreach (var subscription in subscriptions) { if (subscription.TypeSubscription.TopicType == "TestEvent") { found = true; } } Assert.False(found); } /// /// Test StoreAsync and ReadAsync methods /// /// void [Fact] public async Task StoreAsync_and_ReadAsyncTest() { using var runtime = new GrpcRuntime(); var (_, agent) = runtime.Start(); Dictionary state = new() { { "testdata", "Active" } }; await agent.StoreAsync(new AgentState { AgentId = agent.AgentId, TextData = JsonSerializer.Serialize(state) }).ConfigureAwait(true); var readState = await agent.ReadAsync(agent.AgentId).ConfigureAwait(true); var read = JsonSerializer.Deserialize>(readState.TextData) ?? new Dictionary { { "data", "No state data found" } }; read.TryGetValue("testdata", out var value); Assert.Equal("Active", value); } /// /// Test PublishMessageAsync method and ReceiveMessage method /// /// void [Fact] public async Task PublishMessageAsync_and_ReceiveMessageTest() { using var runtime = new GrpcRuntime(); var (_, agent) = runtime.Start(); var topicType = "TestTopic"; await agent.SubscribeAsync(topicType).ConfigureAwait(true); var subscriptions = await agent.GetSubscriptionsAsync().ConfigureAwait(true); var found = false; foreach (var subscription in subscriptions) { if (subscription.TypeSubscription.TopicType == topicType) { found = true; } } Assert.True(found); await agent.PublishMessageAsync(new TextMessage() { Source = topicType, TextMessage_ = "buffer" }, topicType).ConfigureAwait(true); await Task.Delay(100); Assert.True(TestAgent.ReceivedMessages.ContainsKey(topicType)); runtime.Stop(); } [Fact] public async Task InvokeCorrectHandler() { var agent = new TestAgent(new AgentsMetadata(TypeRegistry.Empty, new Dictionary(), new Dictionary>(), new Dictionary>()), new Logger(new LoggerFactory())); await agent.HandleObjectAsync("hello world"); await agent.HandleObjectAsync(42); agent.ReceivedItems.Should().HaveCount(2); agent.ReceivedItems[0].Should().Be("hello world"); agent.ReceivedItems[1].Should().Be(42); } /// /// The test agent is a simple agent that is used for testing purposes. /// public class TestAgent( [FromKeyedServices("AgentsMetadata")] AgentsMetadata eventTypes, Logger? logger = null) : Agent(eventTypes, logger), IHandle { public Task Handle(TextMessage item, CancellationToken cancellationToken = default) { ReceivedMessages[item.Source] = item.TextMessage_; return Task.CompletedTask; } public Task Handle(string item) { ReceivedItems.Add(item); return Task.CompletedTask; } public Task Handle(int item) { ReceivedItems.Add(item); return Task.CompletedTask; } public List ReceivedItems { get; private set; } = []; /// /// Key: source /// Value: message /// public static ConcurrentDictionary ReceivedMessages { get; private set; } = new(); } } /// /// GrpcRuntimeFixture - provides a fixture for the agent runtime. /// /// /// This fixture is used to provide a runtime for the agent tests. /// However, it is shared between tests. So operations from one test can affect another. /// public sealed class GrpcRuntime : IDisposable { public IHost Client { get; private set; } public IHost? AppHost { get; private set; } public GrpcRuntime() { Environment.SetEnvironmentVariable("ASPNETCORE_ENVIRONMENT", "Development"); AppHost = Host.CreateDefaultBuilder().Build(); Client = Host.CreateDefaultBuilder().Build(); } private static int GetAvailablePort() { using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0); listener.Start(); int port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; listener.Stop(); return port; } private static async Task StartClientAsync() { return await AgentsApp.StartAsync().ConfigureAwait(false); } private static async Task StartAppHostAsync() { return await Microsoft.AutoGen.Runtime.Grpc.Host.StartAsync(local: false, useGrpc: true).ConfigureAwait(false); } /// /// Start - gets a new port and starts fresh instances /// public (IAgentWorker, TestAgent) Start(bool initialize = true) { int port = GetAvailablePort(); // Get a new port per test run // Update environment variables so each test runs independently Environment.SetEnvironmentVariable("ASPNETCORE_HTTPS_PORTS", port.ToString()); Environment.SetEnvironmentVariable("AGENT_HOST", $"https://localhost:{port}"); AppHost = StartAppHostAsync().GetAwaiter().GetResult(); Client = StartClientAsync().GetAwaiter().GetResult(); var agent = ActivatorUtilities.CreateInstance(Client.Services); var worker = Client.Services.GetRequiredService(); if (initialize) { Agent.Initialize(worker, agent); } return (worker, agent); } /// /// Stop - stops the agent and ensures cleanup /// public void Stop() { Client?.StopAsync().GetAwaiter().GetResult(); AppHost?.StopAsync().GetAwaiter().GetResult(); } /// /// Dispose - Ensures cleanup after each test /// public void Dispose() { Stop(); } }