diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs index 5f88e37a9..f7ef2fbea 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs @@ -10,7 +10,12 @@ public interface IMessageRegistryGrain : IGrainWithIntegerKey /// /// Writes a message to the dead-letter queue for the given topic. /// - Task WriteMessageAsync(string topic, CloudEvent message); + Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message); + + /// + /// Writes a message to the event buffer for the given topic. + /// + Task AddMessageToEventBufferAsync(string topic, CloudEvent message); /// /// Removes all messages for the given topic from the dead-letter queue. diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs index e753c79ee..0a93924b2 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs @@ -13,6 +13,12 @@ public class MessageRegistryState { /// /// Dictionary mapping topic types to a list of CloudEvents that failed delivery. + /// we read from this queue on new sub and registration so that agents can potentially receive messages they missed /// public ConcurrentDictionary> DeadLetterQueue { get; set; } = new(); + /// + /// A Dictionary of events that have been recently delivered to agents. + /// We keep them around for a short time to ensure that anyone subscribing to the topic within the next few seconds has a chance to receive them. + /// + public ConcurrentDictionary> EventBuffer { get; set; } = new(); } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs index e285e5f65..bf42269aa 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs @@ -175,7 +175,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway var removedMessages = await _messageRegistry.RemoveMessagesAsync(topic); if (removedMessages.Any()) { - _logger.LogInformation("Removed {Count} dead-letter messages for topic '{Topic}'.", removedMessages.Count, topic); + _logger.LogInformation("Removed {Count} dead-letter and buffer messages for topic '{Topic}'.", removedMessages.Count, topic); // now that someone is subscribed, dispatch the messages foreach (var message in removedMessages) { @@ -360,7 +360,11 @@ public sealed class GrpcGateway : BackgroundService, IGateway foreach (var connection in activeConnections) { _logger.LogDebug("Dispatching event {Event} to connection {Connection}, for AgentType {AgentType}.", evt, connection, agentType); - tasks.Add(this.WriteResponseAsync(connection, evt, cancellationToken)); + tasks.Add(Task.Run(async () => + { + await this.WriteResponseAsync(connection, evt, cancellationToken); + await _messageRegistry.AddMessageToEventBufferAsync(evt.Source, evt).ConfigureAwait(true); + })); } } else @@ -368,7 +372,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway // we have target agent types that aren't in the supported agent types // could be a race condition or a bug _logger.LogWarning($"Agent type {agentType} is not supported, but registry returned it as subscribed to {evt.Type}/{evt.Source}. Buffering an event to the dead-letter queue."); - await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true); + await _messageRegistry.AddMessageToDeadLetterQueueAsync(evt.Source, evt).ConfigureAwait(true); } } await Task.WhenAll(tasks).ConfigureAwait(false); @@ -378,7 +382,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway // log that no agent types were found _logger.LogWarning("No agent types found for event type {EventType}. Adding to Dead Letter Queue", evt.Type); // buffer the event to the dead-letter queue - await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true); + await _messageRegistry.AddMessageToDeadLetterQueueAsync(evt.Source, evt).ConfigureAwait(true); } } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs index 83a2e527d..59dec750d 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs @@ -12,13 +12,68 @@ internal sealed class MessageRegistryGrain( ILogger logger ) : Grain, IMessageRegistryGrain { + // + // The number of times to retry writing the state before giving up. + // + private const int _retries = 5; + /// + /// The time to wait before removing a message from the event buffer. + /// in milliseconds + /// + private const int _bufferTime = 5000; private readonly ILogger _logger = logger; - public async Task WriteMessageAsync(string topic, CloudEvent message) + // + public async Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message) + { + await TryWriteMessageAsync("dlq", topic, message).ConfigureAwait(true); + } + + /// + public async Task AddMessageToEventBufferAsync(string topic, CloudEvent message) + { + await TryWriteMessageAsync("eb", topic, message).ConfigureAwait(true); + // Schedule the removal task to run in the background after bufferTime + _ = Task.Delay(_bufferTime) + .ContinueWith( + async _ => await RemoveMessage(topic, message), + TaskScheduler.Default + ); + } + /// + /// remove a specific message from the buffer for a given topic + /// + /// + /// + /// ValueTask + private async ValueTask RemoveMessage(string topic, CloudEvent message) + { + if (state.State.EventBuffer != null && state.State.EventBuffer.TryGetValue(topic, out List? events)) + { + if (events != null && events.Remove(message)) + { + state.State.EventBuffer.AddOrUpdate(topic, events, (_, _) => events); + await state.WriteStateAsync().ConfigureAwait(true); + return true; + } + } + return false; + } + + /// + /// Tries to write a message to the given queue in Orleans state. + /// Allows for retries using etag for optimistic concurrency. + /// + /// + /// + /// + /// + /// + private async ValueTask TryWriteMessageAsync(string whichQueue, string topic, CloudEvent message) { var retries = _retries; - while (!await WriteMessageAsync(topic, message, state.Etag).ConfigureAwait(false)) + while (!await WriteMessageAsync(whichQueue, topic, message, state.Etag).ConfigureAwait(false)) { if (retries-- <= 0) { @@ -27,30 +82,62 @@ internal sealed class MessageRegistryGrain( _logger.LogWarning("Failed to write MessageRegistryState. Retrying..."); retries--; } + if (retries == 0) { return false; } else { return true; } } - private async ValueTask WriteMessageAsync(string topic, CloudEvent message, string etag) + /// + /// Writes a message to the given queue in Orleans state. + /// + /// + /// + /// + /// + /// ValueTask + /// + private async ValueTask WriteMessageAsync(string whichQueue, string topic, CloudEvent message, string etag) { if (state.Etag != null && state.Etag != etag) { return false; } - var queue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new()); - queue.Add(message); - state.State.DeadLetterQueue.AddOrUpdate(topic, queue, (_, _) => queue); + switch (whichQueue) + { + case "dlq": + var dlqQueue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new()); + dlqQueue.Add(message); + state.State.DeadLetterQueue.AddOrUpdate(topic, dlqQueue, (_, _) => dlqQueue); + break; + case "eb": + var ebQueue = state.State.EventBuffer.GetOrAdd(topic, _ => new()); + ebQueue.Add(message); + state.State.EventBuffer.AddOrUpdate(topic, ebQueue, (_, _) => ebQueue); + break; + default: + throw new ArgumentException($"Invalid queue name: {whichQueue}"); + } await state.WriteStateAsync().ConfigureAwait(true); return true; } + // public async Task> RemoveMessagesAsync(string topic) { + var messages = new List(); if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List? letters)) { await state.WriteStateAsync().ConfigureAwait(true); if (letters != null) { - return letters; + messages.AddRange(letters); } } - return []; + if (state.State.EventBuffer != null && state.State.EventBuffer.Remove(topic, out List? events)) + { + await state.WriteStateAsync().ConfigureAwait(true); + if (events != null) + { + messages.AddRange(events); + } + } + return messages; } } diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs index 56876cee3..fee4cf73d 100644 --- a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs @@ -25,7 +25,7 @@ public class MessageRegistryTests : IClassFixture var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" }; // Act - await grain.WriteMessageAsync(topic, message); + await grain.AddMessageToDeadLetterQueueAsync(topic, message); // Assert // attempt to remove the topic from the queue @@ -37,4 +37,43 @@ public class MessageRegistryTests : IClassFixture removedMessages = await grain.RemoveMessagesAsync(topic); Assert.Empty(removedMessages); } + /// + /// Test that messages are removed from the event buffer after the buffer time + /// + [Fact] + public async Task Write_and_Remove_Messages_BufferTime() + { + // Arrange + var grain = _cluster.GrainFactory.GetGrain(0); + var topic = Guid.NewGuid().ToString(); // Random topic + var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" }; + + // Act + await grain.AddMessageToEventBufferAsync(topic, message); + // wait 5 seconds + await Task.Delay(5000); + // attempt to remove the topic from the queue + var removedMessages = await grain.RemoveMessagesAsync(topic); + Assert.Empty(removedMessages); + } + + /// + /// Test that messages are still in the event buffer after 1 second + /// + [Fact] + public async Task Write_and_Remove_Messages_BufferTime_StillInBuffer() + { + // Arrange + var grain = _cluster.GrainFactory.GetGrain(0); + var topic = Guid.NewGuid().ToString(); // Random topic + var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" }; + + // Act + await grain.AddMessageToEventBufferAsync(topic, message); + // wait 1 second + await Task.Delay(1000); + // attempt to remove the topic from the queue + var removedMessages = await grain.RemoveMessagesAsync(topic); + Assert.Single(removedMessages); + } }