// Copyright (c) Microsoft Corporation. All rights reserved. // InProcessRuntime.cs using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.Core; public sealed class InProcessRuntime : IAgentRuntime { Dictionary agentInstances = new(); Dictionary subscriptions = new(); Dictionary>> agentFactories = new(); private ValueTask ExecuteTracedAsync(Func> func) { // TODO: Bind tracing return func(); } private ValueTask ExecuteTracedAsync(Func func) { // TODO: Bind tracing return func(); } public InProcessRuntime() { } public ValueTask PublishMessageAsync(object message, TopicId topic, AgentId? sender = null, string? messageId = null, CancellationToken? cancellationToken = null) { return this.ExecuteTracedAsync(async () => { messageId ??= Guid.NewGuid().ToString(); foreach (var subscription in this.subscriptions.Values.Where(subscription => subscription.Matches(topic))) { AgentId agentId = subscription.MapToAgent(topic); if (sender.HasValue && sender == agentId) { // TODO: enable re-entrant mode continue; } MessageContext messageContext = new (messageId ?? Guid.NewGuid().ToString(), cancellationToken ?? CancellationToken.None) { Sender = sender, Topic = topic, IsRpc = false }; IHostableAgent agent = await this.EnsureAgentAsync(agentId); await agent.OnMessageAsync(message, messageContext); } }); } public ValueTask SendMessageAsync(object message, AgentId recepient, AgentId? sender = null, string? messageId = null, CancellationToken? cancellationToken = null) { return this.ExecuteTracedAsync(async () => { cancellationToken ??= CancellationToken.None; messageId ??= Guid.NewGuid().ToString(); MessageContext messageContext = new(messageId ?? Guid.NewGuid().ToString(), cancellationToken ?? CancellationToken.None) { Sender = sender, IsRpc = false }; IHostableAgent agent = await this.EnsureAgentAsync(recepient); return await agent.OnMessageAsync(message, messageContext); }); } private async ValueTask EnsureAgentAsync(AgentId agentId) { if (!this.agentInstances.TryGetValue(agentId, out IHostableAgent? agent)) { if (!this.agentFactories.TryGetValue(agentId.Type, out Func>? factoryFunc)) { throw new Exception($"Agent with name {agentId.Type} not found."); } agent = await factoryFunc(agentId, this); this.agentInstances.Add(agentId, agent); } return this.agentInstances[agentId]; } public async ValueTask GetAgentAsync(AgentId agentId, bool lazy = true) { if (!lazy) { await this.EnsureAgentAsync(agentId); } return agentId; } public ValueTask GetAgentAsync(AgentType agentType, string key = "default", bool lazy = true) => this.GetAgentAsync(new AgentId(agentType, key), lazy); public ValueTask GetAgentAsync(string agent, string key = "default", bool lazy = true) => this.GetAgentAsync(new AgentId(agent, key), lazy); public async ValueTask GetAgentMetadataAsync(AgentId agentId) { IHostableAgent agent = await this.EnsureAgentAsync(agentId); return agent.Metadata; } public async ValueTask LoadAgentStateAsync(AgentId agentId, IDictionary state) { IHostableAgent agent = await this.EnsureAgentAsync(agentId); await agent.LoadStateAsync(state); } public async ValueTask> SaveAgentStateAsync(AgentId agentId) { IHostableAgent agent = await this.EnsureAgentAsync(agentId); return await agent.SaveStateAsync(); } /// public ValueTask AddSubscriptionAsync(ISubscriptionDefinition subscription) { if (this.subscriptions.ContainsKey(subscription.Id)) { throw new Exception($"Subscription with id {subscription.Id} already exists."); } this.subscriptions.Add(subscription.Id, subscription); return ValueTask.CompletedTask; } public ValueTask RemoveSubscriptionAsync(string subscriptionId) { if (!this.subscriptions.ContainsKey(subscriptionId)) { throw new Exception($"Subscription with id {subscriptionId} does not exist."); } this.subscriptions.Remove(subscriptionId); return ValueTask.CompletedTask; } public async ValueTask LoadStateAsync(IDictionary state) { foreach (var agentIdStr in state.Keys) { AgentId agentId = AgentId.FromStr(agentIdStr); if (state[agentIdStr] is not IDictionary agentState) { throw new Exception($"Agent state for {agentId} is not a {typeof(IDictionary)}: {state[agentIdStr].GetType()}"); } if (this.agentFactories.ContainsKey(agentId.Type)) { IHostableAgent agent = await this.EnsureAgentAsync(agentId); await agent.LoadStateAsync(agentState); } } } public async ValueTask> SaveStateAsync() { Dictionary state = new(); foreach (var agentId in this.agentInstances.Keys) { state[agentId.ToString()] = await this.agentInstances[agentId].SaveStateAsync(); } return state; } public ValueTask RegisterAgentFactoryAsync(AgentType type, Func> factoryFunc) where TAgent : IHostableAgent => this.RegisterAgentFactoryAsync(type, async ValueTask (agentId, runtime) => await factoryFunc(agentId, runtime)); public ValueTask RegisterAgentFactoryAsync(AgentType type, Func> factoryFunc) { if (this.agentFactories.ContainsKey(type)) { throw new Exception($"Agent with type {type} already exists."); } this.agentFactories.Add(type, async (agentId, runtime) => await factoryFunc(agentId, runtime)); return ValueTask.FromResult(type); } public ValueTask TryGetAgentProxyAsync(AgentId agentId) { return ValueTask.FromResult(new AgentProxy(agentId, this)); } }