Add CancellationToken parameters to API surface (#4036)

This commit is contained in:
Reuben Bond 2024-11-01 13:17:17 -07:00 committed by GitHub
parent a4901f3ba8
commit e9c16fe22e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 33 additions and 34 deletions

View File

@ -12,9 +12,9 @@ public interface IAgentContext
IAgentBase? AgentInstance { get; set; }
DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator.
ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger.
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request);
ValueTask PublishEventAsync(CloudEvent @event);
ValueTask Store(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default);
}

View File

@ -5,9 +5,9 @@ namespace Microsoft.AutoGen.Abstractions;
public interface IAgentWorkerRuntime
{
ValueTask PublishEvent(CloudEvent evt);
ValueTask SendRequest(IAgentBase agent, RpcRequest request);
ValueTask SendResponse(RpcResponse response);
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken);
ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken);
ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken);
ValueTask Store(AgentState value, CancellationToken cancellationToken);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken);
}

View File

@ -202,15 +202,14 @@ public abstract class AgentBase : IAgentBase, IHandle
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");
var completion = new TaskCompletionSource<CloudEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
// TODO: fix activity
Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource<CloudEvent>) state) =>
static async ((AgentBase Agent, CloudEvent Event) state) =>
{
await state.Agent._context.PublishEventAsync(state.Event).ConfigureAwait(false);
},
(this, item, completion),
(this, item),
activity,
item.Type).ConfigureAwait(false);
}

View File

@ -15,25 +15,25 @@ internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime,
public ILogger Logger { get; } = logger;
public IAgentBase? AgentInstance { get; set; }
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response)
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken)
{
response.RequestId = request.RequestId;
await _runtime.SendResponse(response);
await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request)
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
{
await _runtime.SendRequest(agent, request).ConfigureAwait(false);
await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent @event)
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken)
{
await _runtime.PublishEvent(@event).ConfigureAwait(false);
await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false);
}
public async ValueTask Store(AgentState value)
public async ValueTask Store(AgentState value, CancellationToken cancellationToken)
{
await _runtime.Store(value).ConfigureAwait(false);
await _runtime.Store(value, cancellationToken).ConfigureAwait(false);
}
public ValueTask<AgentState> Read(AgentId agentId)
public ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
{
return _runtime.Read(agentId);
return _runtime.Read(agentId, cancellationToken);
}
}

View File

@ -228,13 +228,13 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
}
}
public async ValueTask SendResponse(RpcResponse response)
public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken)
{
_logger.LogInformation("Sending response '{Response}'.", response);
await WriteChannelAsync(new Message { Response = response }).ConfigureAwait(false);
await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequest(IAgentBase agent, RpcRequest request)
public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request);
var requestId = Guid.NewGuid().ToString();
@ -242,7 +242,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
request.RequestId = requestId;
try
{
await WriteChannelAsync(new Message { Request = request }).ConfigureAwait(false);
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
@ -253,11 +253,11 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
}
}
public async ValueTask PublishEvent(CloudEvent @event)
public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken)
{
try
{
await WriteChannelAsync(new Message { CloudEvent = @event }).ConfigureAwait(false);
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
@ -265,7 +265,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
}
}
private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default)
private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false);
@ -364,19 +364,19 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
_channel?.Dispose();
}
}
public ValueTask Store(AgentState value)
public ValueTask Store(AgentState value, CancellationToken cancellationToken)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
var response = _client.SaveState(value);
var response = _client.SaveState(value, cancellationToken: cancellationToken);
if (!response.Success)
{
throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}.");
}
return ValueTask.CompletedTask;
}
public async ValueTask<AgentState> Read(AgentId agentId)
public async ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
{
var response = await _client.GetStateAsync(agentId);
var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken);
// if (response.Success && response.AgentState.AgentId is not null) - why is success always false?
if (response.AgentState.AgentId is not null)
{