add a buffer to message delivery so that clients wh subscribe within a window can receive (#5543)

sometimes a client will subscribe but the message it was hoping for is
already published and delivered to one of its peers but it missed it.
this adds a five second (default) buffer and will deliver buffered
messages to new subscribers. messages are removed from the buffer after
5 seconds

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Ryan Sweet 2025-02-14 07:42:18 -08:00 committed by GitHub
parent 8029572f3f
commit acd7e86430
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 155 additions and 14 deletions

View File

@ -10,7 +10,12 @@ public interface IMessageRegistryGrain : IGrainWithIntegerKey
/// <summary>
/// Writes a message to the dead-letter queue for the given topic.
/// </summary>
Task WriteMessageAsync(string topic, CloudEvent message);
Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message);
/// <summary>
/// Writes a message to the event buffer for the given topic.
/// </summary>
Task AddMessageToEventBufferAsync(string topic, CloudEvent message);
/// <summary>
/// Removes all messages for the given topic from the dead-letter queue.

View File

@ -13,6 +13,12 @@ public class MessageRegistryState
{
/// <summary>
/// Dictionary mapping topic types to a list of CloudEvents that failed delivery.
/// we read from this queue on new sub and registration so that agents can potentially receive messages they missed
/// </summary>
public ConcurrentDictionary<string, List<CloudEvent>> DeadLetterQueue { get; set; } = new();
/// <summary>
/// A Dictionary of events that have been recently delivered to agents.
/// We keep them around for a short time to ensure that anyone subscribing to the topic within the next few seconds has a chance to receive them.
/// </summary>
public ConcurrentDictionary<string, List<CloudEvent>> EventBuffer { get; set; } = new();
}

View File

@ -175,7 +175,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway
var removedMessages = await _messageRegistry.RemoveMessagesAsync(topic);
if (removedMessages.Any())
{
_logger.LogInformation("Removed {Count} dead-letter messages for topic '{Topic}'.", removedMessages.Count, topic);
_logger.LogInformation("Removed {Count} dead-letter and buffer messages for topic '{Topic}'.", removedMessages.Count, topic);
// now that someone is subscribed, dispatch the messages
foreach (var message in removedMessages)
{
@ -360,7 +360,11 @@ public sealed class GrpcGateway : BackgroundService, IGateway
foreach (var connection in activeConnections)
{
_logger.LogDebug("Dispatching event {Event} to connection {Connection}, for AgentType {AgentType}.", evt, connection, agentType);
tasks.Add(this.WriteResponseAsync(connection, evt, cancellationToken));
tasks.Add(Task.Run(async () =>
{
await this.WriteResponseAsync(connection, evt, cancellationToken);
await _messageRegistry.AddMessageToEventBufferAsync(evt.Source, evt).ConfigureAwait(true);
}));
}
}
else
@ -368,7 +372,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway
// we have target agent types that aren't in the supported agent types
// could be a race condition or a bug
_logger.LogWarning($"Agent type {agentType} is not supported, but registry returned it as subscribed to {evt.Type}/{evt.Source}. Buffering an event to the dead-letter queue.");
await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true);
await _messageRegistry.AddMessageToDeadLetterQueueAsync(evt.Source, evt).ConfigureAwait(true);
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
@ -378,7 +382,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway
// log that no agent types were found
_logger.LogWarning("No agent types found for event type {EventType}. Adding to Dead Letter Queue", evt.Type);
// buffer the event to the dead-letter queue
await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true);
await _messageRegistry.AddMessageToDeadLetterQueueAsync(evt.Source, evt).ConfigureAwait(true);
}
}

View File

@ -12,13 +12,68 @@ internal sealed class MessageRegistryGrain(
ILogger<MessageRegistryGrain> logger
) : Grain, IMessageRegistryGrain
{
// <summary>
// The number of times to retry writing the state before giving up.
// </summary>
private const int _retries = 5;
/// <summary>
/// The time to wait before removing a message from the event buffer.
/// in milliseconds
/// </summary>
private const int _bufferTime = 5000;
private readonly ILogger<MessageRegistryGrain> _logger = logger;
public async Task WriteMessageAsync(string topic, CloudEvent message)
// <inheritdoc />
public async Task AddMessageToDeadLetterQueueAsync(string topic, CloudEvent message)
{
await TryWriteMessageAsync("dlq", topic, message).ConfigureAwait(true);
}
///<inheritdoc />
public async Task AddMessageToEventBufferAsync(string topic, CloudEvent message)
{
await TryWriteMessageAsync("eb", topic, message).ConfigureAwait(true);
// Schedule the removal task to run in the background after bufferTime
_ = Task.Delay(_bufferTime)
.ContinueWith(
async _ => await RemoveMessage(topic, message),
TaskScheduler.Default
);
}
/// <summary>
/// remove a specific message from the buffer for a given topic
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns>ValueTask<bool></returns>
private async ValueTask<bool> RemoveMessage(string topic, CloudEvent message)
{
if (state.State.EventBuffer != null && state.State.EventBuffer.TryGetValue(topic, out List<CloudEvent>? events))
{
if (events != null && events.Remove(message))
{
state.State.EventBuffer.AddOrUpdate(topic, events, (_, _) => events);
await state.WriteStateAsync().ConfigureAwait(true);
return true;
}
}
return false;
}
/// <summary>
/// Tries to write a message to the given queue in Orleans state.
/// Allows for retries using etag for optimistic concurrency.
/// </summary>
/// <param name="whichQueue"></param>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
private async ValueTask<bool> TryWriteMessageAsync(string whichQueue, string topic, CloudEvent message)
{
var retries = _retries;
while (!await WriteMessageAsync(topic, message, state.Etag).ConfigureAwait(false))
while (!await WriteMessageAsync(whichQueue, topic, message, state.Etag).ConfigureAwait(false))
{
if (retries-- <= 0)
{
@ -27,30 +82,62 @@ internal sealed class MessageRegistryGrain(
_logger.LogWarning("Failed to write MessageRegistryState. Retrying...");
retries--;
}
if (retries == 0) { return false; } else { return true; }
}
private async ValueTask<bool> WriteMessageAsync(string topic, CloudEvent message, string etag)
/// <summary>
/// Writes a message to the given queue in Orleans state.
/// </summary>
/// <param name="whichQueue"></param>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <param name="etag"></param>
/// <returns>ValueTask<bool></returns>
/// <exception cref="ArgumentException"></exception>
private async ValueTask<bool> WriteMessageAsync(string whichQueue, string topic, CloudEvent message, string etag)
{
if (state.Etag != null && state.Etag != etag)
{
return false;
}
var queue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new());
queue.Add(message);
state.State.DeadLetterQueue.AddOrUpdate(topic, queue, (_, _) => queue);
switch (whichQueue)
{
case "dlq":
var dlqQueue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new());
dlqQueue.Add(message);
state.State.DeadLetterQueue.AddOrUpdate(topic, dlqQueue, (_, _) => dlqQueue);
break;
case "eb":
var ebQueue = state.State.EventBuffer.GetOrAdd(topic, _ => new());
ebQueue.Add(message);
state.State.EventBuffer.AddOrUpdate(topic, ebQueue, (_, _) => ebQueue);
break;
default:
throw new ArgumentException($"Invalid queue name: {whichQueue}");
}
await state.WriteStateAsync().ConfigureAwait(true);
return true;
}
// <inheritdoc />
public async Task<List<CloudEvent>> RemoveMessagesAsync(string topic)
{
var messages = new List<CloudEvent>();
if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List<CloudEvent>? letters))
{
await state.WriteStateAsync().ConfigureAwait(true);
if (letters != null)
{
return letters;
messages.AddRange(letters);
}
}
return [];
if (state.State.EventBuffer != null && state.State.EventBuffer.Remove(topic, out List<CloudEvent>? events))
{
await state.WriteStateAsync().ConfigureAwait(true);
if (events != null)
{
messages.AddRange(events);
}
}
return messages;
}
}

View File

@ -25,7 +25,7 @@ public class MessageRegistryTests : IClassFixture<ClusterFixture>
var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" };
// Act
await grain.WriteMessageAsync(topic, message);
await grain.AddMessageToDeadLetterQueueAsync(topic, message);
// Assert
// attempt to remove the topic from the queue
@ -37,4 +37,43 @@ public class MessageRegistryTests : IClassFixture<ClusterFixture>
removedMessages = await grain.RemoveMessagesAsync(topic);
Assert.Empty(removedMessages);
}
/// <summary>
/// Test that messages are removed from the event buffer after the buffer time
/// </summary>
[Fact]
public async Task Write_and_Remove_Messages_BufferTime()
{
// Arrange
var grain = _cluster.GrainFactory.GetGrain<IMessageRegistryGrain>(0);
var topic = Guid.NewGuid().ToString(); // Random topic
var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" };
// Act
await grain.AddMessageToEventBufferAsync(topic, message);
// wait 5 seconds
await Task.Delay(5000);
// attempt to remove the topic from the queue
var removedMessages = await grain.RemoveMessagesAsync(topic);
Assert.Empty(removedMessages);
}
/// <summary>
/// Test that messages are still in the event buffer after 1 second
/// </summary>
[Fact]
public async Task Write_and_Remove_Messages_BufferTime_StillInBuffer()
{
// Arrange
var grain = _cluster.GrainFactory.GetGrain<IMessageRegistryGrain>(0);
var topic = Guid.NewGuid().ToString(); // Random topic
var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" };
// Act
await grain.AddMessageToEventBufferAsync(topic, message);
// wait 1 second
await Task.Delay(1000);
// attempt to remove the topic from the queue
var removedMessages = await grain.RemoveMessagesAsync(topic);
Assert.Single(removedMessages);
}
}