streams WIP

This commit is contained in:
Kosta Petan 2024-02-18 20:21:44 +00:00
parent 8fcd2f43ff
commit 4c98edc176
6 changed files with 55 additions and 25 deletions

View File

@ -67,7 +67,8 @@ builder.Host.UseOrleans(siloBuilder =>
if (builder.Environment.IsDevelopment()) if (builder.Environment.IsDevelopment())
{ {
var connectionString = builder.Configuration.GetValue<string>("AzureOptions:CosmosConnectionString"); var connectionString = builder.Configuration.GetValue<string>("AzureOptions:CosmosConnectionString");
siloBuilder.AddMemoryStreams("StreamProvider"); siloBuilder.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
siloBuilder.UseCosmosReminderService( o => siloBuilder.UseCosmosReminderService( o =>
{ {
o.ConfigureCosmosClient(connectionString); o.ConfigureCosmosClient(connectionString);

View File

@ -10,16 +10,14 @@ using Orleans.Runtime;
public sealed class GithubWebHookProcessor : WebhookEventProcessor public sealed class GithubWebHookProcessor : WebhookEventProcessor
{ {
private readonly ILogger<GithubWebHookProcessor> _logger; private readonly ILogger<GithubWebHookProcessor> _logger;
private readonly IGrainFactory _grains;
private readonly IClusterClient _client; private readonly IClusterClient _client;
private readonly IManageGithub _ghService; private readonly IManageGithub _ghService;
private readonly IManageAzure _azService; private readonly IManageAzure _azService;
public GithubWebHookProcessor(ILogger<GithubWebHookProcessor> logger, IGrainFactory grains, public GithubWebHookProcessor(ILogger<GithubWebHookProcessor> logger,
IClusterClient client, IManageGithub ghService, IManageAzure azService) IClusterClient client, IManageGithub ghService, IManageAzure azService)
{ {
_logger = logger; _logger = logger;
_grains = grains;
_client = client; _client = client;
_ghService = ghService; _ghService = ghService;
_azService = azService; _azService = azService;
@ -84,7 +82,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo) private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo)
{ {
var streamProvider = _client.GetStreamProvider("StreamProvider"); var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(suffix, issueNumber.ToString()); var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId); var stream = streamProvider.GetStream<Event>(streamId);
var eventType = (skillName, functionName) switch var eventType = (skillName, functionName) switch

View File

@ -9,7 +9,8 @@ using Orleans.Streams;
using System.Text.Json; using System.Text.Json;
namespace Microsoft.AI.DevTeam; namespace Microsoft.AI.DevTeam;
public class DeveloperLead : SemanticPersona, ILeadDevelopment [ImplicitStreamSubscription("DevPersonas")]
public class DeveloperLead : SemanticPersona
{ {
private readonly IKernel _kernel; private readonly IKernel _kernel;
private readonly ISemanticTextMemory _memory; private readonly ISemanticTextMemory _memory;
@ -26,6 +27,14 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
_logger = logger; _logger = logger;
_ghService = ghService; _ghService = ghService;
} }
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);
}
public async Task CreateIssue(string org, string repo, long parentNumber, string input) public async Task CreateIssue(string org, string repo, long parentNumber, string input)
{ {
@ -108,6 +117,12 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
case EventType.NewAsk: case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message); await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break; break;
case EventType.NewAskPlan:
await CreatePlan(item.Message);
break;
case EventType.ChainClosed:
await ClosePlan();
break;
default: default:
break; break;
} }

View File

@ -9,7 +9,7 @@ using Orleans.Streams;
namespace Microsoft.AI.DevTeam; namespace Microsoft.AI.DevTeam;
public class Dev : SemanticPersona, IDevelopCode public class Dev : SemanticPersona
{ {
private readonly IKernel _kernel; private readonly IKernel _kernel;
private readonly ISemanticTextMemory _memory; private readonly ISemanticTextMemory _memory;
@ -80,6 +80,13 @@ public class Dev : SemanticPersona, IDevelopCode
switch (item.Type) switch (item.Type)
{ {
case EventType.NewAsk: case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break;
case EventType.NewAskImplement:
await GenerateCode(item.Message);
break;
case EventType.ChainClosed:
await CloseImplementation();
break; break;
default: default:
break; break;

View File

@ -8,7 +8,9 @@ using Orleans.Runtime;
using Orleans.Streams; using Orleans.Streams;
namespace Microsoft.AI.DevTeam; namespace Microsoft.AI.DevTeam;
public class ProductManager : SemanticPersona, IManageProduct
[ImplicitStreamSubscription("DevPersonas")]
public class ProductManager : SemanticPersona
{ {
private readonly IKernel _kernel; private readonly IKernel _kernel;
private readonly ISemanticTextMemory _memory; private readonly ISemanticTextMemory _memory;
@ -26,6 +28,15 @@ public class ProductManager : SemanticPersona, IManageProduct
_ghService = ghService; _ghService = ghService;
} }
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);
}
public async override Task HandleEvent(Event item, StreamSequenceToken? token) public async override Task HandleEvent(Event item, StreamSequenceToken? token)
{ {
switch (item.Type) switch (item.Type)
@ -33,6 +44,12 @@ public class ProductManager : SemanticPersona, IManageProduct
case EventType.NewAsk: case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message); await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break; break;
case EventType.NewAskReadme:
await CreateReadme(item.Message);
break;
case EventType.ChainClosed:
await CloseReadme();
break;
default: default:
break; break;
} }

View File

@ -6,7 +6,8 @@ using Orleans.Streams;
namespace Microsoft.AI.DevTeam; namespace Microsoft.AI.DevTeam;
public abstract class SemanticPersona : Grain, IChatHistory
public abstract class SemanticPersona : Grain, IChatHistory, IGrainWithStringKey
{ {
public SemanticPersona( public SemanticPersona(
[PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state) [PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state)
@ -36,23 +37,14 @@ public abstract class SemanticPersona : Grain, IChatHistory
context.Set("wafContext", wafContext); context.Set("wafContext", wafContext);
} }
public async override Task OnActivateAsync(CancellationToken cancellationToken) // public async override Task OnActivateAsync(CancellationToken cancellationToken)
{ // {
var streamProvider = this.GetStreamProvider(""); // var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey()); // var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey());
var stream = streamProvider.GetStream<Event>(streamId); // var stream = streamProvider.GetStream<Event>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles(); // await stream.SubscribeAsync(HandleEvent);
if ( subscriptionHandles != null && subscriptionHandles.Count > 0) // }
{
subscriptionHandles.ToList().ForEach(
async x => await x.ResumeAsync(HandleEvent));
}
else
{
await stream.SubscribeAsync(HandleEvent);
}
}
} }
public interface IChatHistory public interface IChatHistory