From 4c98edc17667ba0f8045e21f199f6e200c660cd2 Mon Sep 17 00:00:00 2001 From: Kosta Petan Date: Sun, 18 Feb 2024 20:21:44 +0000 Subject: [PATCH] streams WIP --- src/apps/gh-flow/Program.cs | 3 ++- .../Services/GithubWebHookProcessor.cs | 6 ++--- .../Actors/DevLead/DeveloperLead.cs | 17 +++++++++++- .../Actors/Developer/Developer.cs | 9 ++++++- .../Actors/ProductManager/ProductManager.cs | 19 +++++++++++++- .../Actors/SemanticPersona.cs | 26 +++++++------------ 6 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/apps/gh-flow/Program.cs b/src/apps/gh-flow/Program.cs index ea154c051..b841bba5e 100644 --- a/src/apps/gh-flow/Program.cs +++ b/src/apps/gh-flow/Program.cs @@ -67,7 +67,8 @@ builder.Host.UseOrleans(siloBuilder => if (builder.Environment.IsDevelopment()) { var connectionString = builder.Configuration.GetValue("AzureOptions:CosmosConnectionString"); - siloBuilder.AddMemoryStreams("StreamProvider"); + siloBuilder.AddMemoryStreams("StreamProvider") + .AddMemoryGrainStorage("PubSubStore"); siloBuilder.UseCosmosReminderService( o => { o.ConfigureCosmosClient(connectionString); diff --git a/src/apps/gh-flow/Services/GithubWebHookProcessor.cs b/src/apps/gh-flow/Services/GithubWebHookProcessor.cs index b98b14344..4f470aff9 100644 --- a/src/apps/gh-flow/Services/GithubWebHookProcessor.cs +++ b/src/apps/gh-flow/Services/GithubWebHookProcessor.cs @@ -10,16 +10,14 @@ using Orleans.Runtime; public sealed class GithubWebHookProcessor : WebhookEventProcessor { private readonly ILogger _logger; - private readonly IGrainFactory _grains; private readonly IClusterClient _client; private readonly IManageGithub _ghService; private readonly IManageAzure _azService; - public GithubWebHookProcessor(ILogger logger, IGrainFactory grains, + public GithubWebHookProcessor(ILogger logger, IClusterClient client, IManageGithub ghService, IManageAzure azService) { _logger = logger; - _grains = grains; _client = client; _ghService = ghService; _azService = azService; @@ -84,7 +82,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo) { var streamProvider = _client.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create(suffix, issueNumber.ToString()); + var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString()); var stream = streamProvider.GetStream(streamId); var eventType = (skillName, functionName) switch diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs b/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs index 4dbd9c099..c6408737e 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs @@ -9,7 +9,8 @@ using Orleans.Streams; using System.Text.Json; namespace Microsoft.AI.DevTeam; -public class DeveloperLead : SemanticPersona, ILeadDevelopment +[ImplicitStreamSubscription("DevPersonas")] +public class DeveloperLead : SemanticPersona { private readonly IKernel _kernel; private readonly ISemanticTextMemory _memory; @@ -26,6 +27,14 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment _logger = logger; _ghService = ghService; } + public async override Task OnActivateAsync(CancellationToken cancellationToken) + { + var streamProvider = this.GetStreamProvider("StreamProvider"); + var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey()); + var stream = streamProvider.GetStream(streamId); + + await stream.SubscribeAsync(HandleEvent); + } public async Task CreateIssue(string org, string repo, long parentNumber, string input) { @@ -108,6 +117,12 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment case EventType.NewAsk: await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message); break; + case EventType.NewAskPlan: + await CreatePlan(item.Message); + break; + case EventType.ChainClosed: + await ClosePlan(); + break; default: break; } diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/Developer/Developer.cs b/src/libs/Microsoft.AI.DevTeam/Actors/Developer/Developer.cs index f00f9ac67..746c3cc0b 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/Developer/Developer.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/Developer/Developer.cs @@ -9,7 +9,7 @@ using Orleans.Streams; namespace Microsoft.AI.DevTeam; -public class Dev : SemanticPersona, IDevelopCode +public class Dev : SemanticPersona { private readonly IKernel _kernel; private readonly ISemanticTextMemory _memory; @@ -80,6 +80,13 @@ public class Dev : SemanticPersona, IDevelopCode switch (item.Type) { case EventType.NewAsk: + await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message); + break; + case EventType.NewAskImplement: + await GenerateCode(item.Message); + break; + case EventType.ChainClosed: + await CloseImplementation(); break; default: break; diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs b/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs index d5f6fad2e..5e5624a71 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs @@ -8,7 +8,9 @@ using Orleans.Runtime; using Orleans.Streams; namespace Microsoft.AI.DevTeam; -public class ProductManager : SemanticPersona, IManageProduct + +[ImplicitStreamSubscription("DevPersonas")] +public class ProductManager : SemanticPersona { private readonly IKernel _kernel; private readonly ISemanticTextMemory _memory; @@ -26,6 +28,15 @@ public class ProductManager : SemanticPersona, IManageProduct _ghService = ghService; } + public async override Task OnActivateAsync(CancellationToken cancellationToken) + { + var streamProvider = this.GetStreamProvider("StreamProvider"); + var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey()); + var stream = streamProvider.GetStream(streamId); + + await stream.SubscribeAsync(HandleEvent); + } + public async override Task HandleEvent(Event item, StreamSequenceToken? token) { switch (item.Type) @@ -33,6 +44,12 @@ public class ProductManager : SemanticPersona, IManageProduct case EventType.NewAsk: await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message); break; + case EventType.NewAskReadme: + await CreateReadme(item.Message); + break; + case EventType.ChainClosed: + await CloseReadme(); + break; default: break; } diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/SemanticPersona.cs b/src/libs/Microsoft.AI.DevTeam/Actors/SemanticPersona.cs index dffaa6878..c913c899b 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/SemanticPersona.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/SemanticPersona.cs @@ -6,7 +6,8 @@ using Orleans.Streams; namespace Microsoft.AI.DevTeam; -public abstract class SemanticPersona : Grain, IChatHistory + +public abstract class SemanticPersona : Grain, IChatHistory, IGrainWithStringKey { public SemanticPersona( [PersistentState("state", "messages")] IPersistentState state) @@ -36,23 +37,14 @@ public abstract class SemanticPersona : Grain, IChatHistory context.Set("wafContext", wafContext); } - public async override Task OnActivateAsync(CancellationToken cancellationToken) - { - var streamProvider = this.GetStreamProvider(""); - var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey()); - var stream = streamProvider.GetStream(streamId); + // public async override Task OnActivateAsync(CancellationToken cancellationToken) + // { + // var streamProvider = this.GetStreamProvider("StreamProvider"); + // var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey()); + // var stream = streamProvider.GetStream(streamId); - var subscriptionHandles = await stream.GetAllSubscriptionHandles(); - if ( subscriptionHandles != null && subscriptionHandles.Count > 0) - { - subscriptionHandles.ToList().ForEach( - async x => await x.ResumeAsync(HandleEvent)); - } - else - { - await stream.SubscribeAsync(HandleEvent); - } - } + // await stream.SubscribeAsync(HandleEvent); + // } } public interface IChatHistory