diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs index 59dec750d..534114920 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs @@ -12,10 +12,14 @@ internal sealed class MessageRegistryGrain( ILogger logger ) : Grain, IMessageRegistryGrain { + // + // Helper class for managing state writes. + // + private readonly StateManager _stateManager = new(state); + // // The number of times to retry writing the state before giving up. // - private const int _retries = 5; /// /// The time to wait before removing a message from the event buffer. @@ -35,12 +39,9 @@ internal sealed class MessageRegistryGrain( { await TryWriteMessageAsync("eb", topic, message).ConfigureAwait(true); // Schedule the removal task to run in the background after bufferTime - _ = Task.Delay(_bufferTime) - .ContinueWith( - async _ => await RemoveMessage(topic, message), - TaskScheduler.Default - ); + RemoveMessageAfterDelay(topic, message).Ignore(); } + /// /// remove a specific message from the buffer for a given topic /// @@ -54,13 +55,24 @@ internal sealed class MessageRegistryGrain( if (events != null && events.Remove(message)) { state.State.EventBuffer.AddOrUpdate(topic, events, (_, _) => events); - await state.WriteStateAsync().ConfigureAwait(true); + await _stateManager.WriteStateAsync().ConfigureAwait(true); return true; } } return false; } + /// + /// remove a specific message from the buffer for a given topic after a delay + /// + /// + /// + private async Task RemoveMessageAfterDelay(string topic, CloudEvent message) + { + await Task.Delay(_bufferTime); + await RemoveMessage(topic, message); + } + /// /// Tries to write a message to the given queue in Orleans state. /// Allows for retries using etag for optimistic concurrency. @@ -114,7 +126,7 @@ internal sealed class MessageRegistryGrain( default: throw new ArgumentException($"Invalid queue name: {whichQueue}"); } - await state.WriteStateAsync().ConfigureAwait(true); + await _stateManager.WriteStateAsync().ConfigureAwait(true); return true; } @@ -124,7 +136,7 @@ internal sealed class MessageRegistryGrain( var messages = new List(); if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List? letters)) { - await state.WriteStateAsync().ConfigureAwait(true); + await _stateManager.WriteStateAsync().ConfigureAwait(true); if (letters != null) { messages.AddRange(letters); @@ -132,7 +144,7 @@ internal sealed class MessageRegistryGrain( } if (state.State.EventBuffer != null && state.State.EventBuffer.Remove(topic, out List? events)) { - await state.WriteStateAsync().ConfigureAwait(true); + await _stateManager.WriteStateAsync().ConfigureAwait(true); if (events != null) { messages.AddRange(events); diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/StateManager.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/StateManager.cs new file mode 100644 index 000000000..6c022d1c0 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/StateManager.cs @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// StateManager.cs + +using Orleans.Core; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; + +/// +/// A helper class which wraps a grain state instance and ensures that only a single write operation is outstanding at any moment in time. +/// +/// The grain state. +internal sealed class StateManager(IStorage state) +{ + /// + /// Allows state writing to happen in the background. + /// + private Task? _pendingOperation; + + // When reentrant grain is doing WriteStateAsync, etag violations are possible due to concurrent writes. + // The solution is to serialize and batch writes, and make sure only a single write is outstanding at any moment in time. + public async ValueTask WriteStateAsync() + { + await PerformOperationAsync(static state => state.WriteStateAsync()); + } + + public async ValueTask ClearStateAsync() + { + await PerformOperationAsync(static state => state.ClearStateAsync()); + } + + public async ValueTask PerformOperationAsync(Func performOperation) + { + if (_pendingOperation is Task currentWriteStateOperation) + { + // await the outstanding write, but ignore it since it doesn't include our changes + await currentWriteStateOperation.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing | ConfigureAwaitOptions.ContinueOnCapturedContext); + if (_pendingOperation == currentWriteStateOperation) + { + // only null out the outstanding operation if it's the same one as the one we awaited, otherwise + // another request might have already done so. + _pendingOperation = null; + } + } + + Task operation; + if (_pendingOperation is null) + { + // If after the initial write is completed, no other request initiated a new write operation, do it now. + operation = performOperation(state); + _pendingOperation = operation; + } + else + { + // If there were many requests enqueued to persist state, there is no reason to enqueue a new write + // operation for each, since any write (after the initial one that we already awaited) will have cumulative + // changes including the one requested by our caller. Just await the new outstanding write. + operation = _pendingOperation; + } + + try + { + await operation; + } + finally + { + if (_pendingOperation == operation) + { + // only null out the outstanding operation if it's the same one as the one we awaited, otherwise + // another request might have already done so. + _pendingOperation = null; + } + } + } +}