From cbe6d2ed34f4716bd92dbcbadae2e2c3998f6cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poul=20Kjeldager=20S=C3=B8rensen?= Date: Sun, 18 Aug 2024 18:59:52 +0200 Subject: [PATCH] fix: refactored abit to allowoutput respository to run without known type --- .../DependencyInjectionExtensions.cs | 84 ++++++++++--- .../EAVFW.Extensions.WorkflowEngine.csproj | 4 +- .../EAVFWOutputsRepository.cs | 118 +++++++++++++----- .../Endpoints/WorkflowEndpointOptions.cs | 9 ++ 4 files changed, 162 insertions(+), 53 deletions(-) diff --git a/src/EAVFW.Extensions.WorkflowEngine/DependencyInjectionExtensions.cs b/src/EAVFW.Extensions.WorkflowEngine/DependencyInjectionExtensions.cs index b32ab40..5a5f853 100644 --- a/src/EAVFW.Extensions.WorkflowEngine/DependencyInjectionExtensions.cs +++ b/src/EAVFW.Extensions.WorkflowEngine/DependencyInjectionExtensions.cs @@ -28,6 +28,7 @@ using System.IO.Compression; using System.Linq; using System.Reflection; +using System.Runtime.CompilerServices; using System.Security.Claims; using System.Threading.Tasks; using WorkflowEngine; @@ -47,10 +48,22 @@ public static ClaimsPrincipal GetRunningPrincipal(this IRunContext context) } public static class DependencyInjectionExtensions { - public static IEndpointRouteBuilder MapWorkFlowEndpoints( + public static IEndpointRouteBuilder MapWorkFlowEndpoints( + this IEndpointRouteBuilder endpoints) + where TContext : DynamicContext + where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + { + + var options = endpoints.ServiceProvider.GetRequiredService>(); + options.Value.RunFactory = (context,id) => new TWorkflowRun { Id = id } ; + options.Value.WorkflowEntityName = typeof(TWorkflowRun).GetCustomAttribute().CollectionSchemaName; + return endpoints.MapWorkFlowEndpoints(); + + } + public static IEndpointRouteBuilder MapWorkFlowEndpoints( this IEndpointRouteBuilder endpoints) where TContext : DynamicContext - where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + // where TWorkflowRun : DynamicEntity, IWorkflowRun, new() { var options = endpoints.ServiceProvider.GetRequiredService>(); @@ -158,12 +171,13 @@ await JToken.ReadFromAsync( return; } var context = httpContext.RequestServices.GetRequiredService>(); - context.Add(new TWorkflowRun() { Id = trigger.RunId }); + // var run = Activator.CreateInstance(context.Context.get) + context.Add( options.Value.RunFactory(context,trigger.RunId)); await context.SaveChangesAsync(httpContext.User); var backgroundJobClient = httpContext.RequestServices.GetRequiredService(); - var job = backgroundJobClient.Enqueue( + var job = backgroundJobClient.Enqueue(options.Value.QueueName, executor => executor.TriggerAsync(trigger,null)); @@ -218,11 +232,11 @@ await JToken.ReadFromAsync( var trigger = await BuildTrigger(workflows, workflowname, httpcontext.User?.FindFirstValue("sub"), inputs); - context.Add(new TWorkflowRun() { Id=trigger.RunId }); + context.Add(options.Value.RunFactory(context,trigger.RunId)); await context.SaveChangesAsync(httpcontext.User); - var job = backgroundJobClient.Enqueue( + var job = backgroundJobClient.Enqueue(options.Value.QueueName, (executor) => executor.TriggerAsync(trigger,null)); await httpcontext.Response.WriteJsonAsync(new { id = trigger.RunId, job = job }); @@ -232,16 +246,16 @@ await JToken.ReadFromAsync( if (options.Value.IncludeWorkflowState) { endpoints.MapGet("/api/workflowruns/{workflowRunId}/status", async context => - await ApiWorkflowsEndpoint(context, true)); + await ApiWorkflowsEndpoint(context, options, true)); endpoints.MapGet("/api/workflowruns/{workflowRunId}", - async context => await ApiWorkflowsEndpoint(context)); + async context => await ApiWorkflowsEndpoint(context,options)); endpoints.MapGet("/api/workflows/{workflowId}/runs/{workflowRunId}", - async context => await ApiWorkflowsEndpoint(context)); + async context => await ApiWorkflowsEndpoint(context, options)); endpoints.MapGet("/api/workflows/{workflowId}/runs/{workflowRunId}/status", - async context => await ApiWorkflowsEndpoint(context,true)); + async context => await ApiWorkflowsEndpoint(context,options,true)); } @@ -249,8 +263,8 @@ await JToken.ReadFromAsync( return endpoints; } - private static async Task ApiWorkflowsEndpoint(HttpContext context, bool statusOnly = false) where TContext : DynamicContext - where TWorkflowRun : DynamicEntity, IWorkflowRun + private static async Task ApiWorkflowsEndpoint(HttpContext context, IOptions options, bool statusOnly = false) where TContext : DynamicContext + // where TWorkflowRun : DynamicEntity, IWorkflowRun { var routeJobId = context.GetRouteValue("workflowRunId") as string; if (!Guid.TryParse(routeJobId, out var jobId)) @@ -262,8 +276,8 @@ private static async Task ApiWorkflowsEndpoint(HttpConte var db = context.RequestServices.GetRequiredService>(); - var workflowRun = await db.Set().FindAsync(jobId); - + var workflowRunEntry = await db.FindAsync(options.Value.WorkflowEntityName?? "WorkflowRuns",jobId); + var workflowRun = workflowRunEntry?.Entity as IWorkflowRun; if (workflowRun == null) { await new NotFoundResult().ExecuteAsync(context); @@ -332,21 +346,48 @@ private static Task GetState(IWorkflowRun run) NullValueHandling= NullValueHandling.Ignore }); return Task.FromResult(serializer.Deserialize(tinyStream)); } - - public static IEAVFrameworkBuilder AddWorkFlowEngine( + this IEAVFrameworkBuilder builder, + string workflowContextPrincipalId, + Func configureHangfire = null, bool withJobServer = true) + where TContext : DynamicContext + where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + { + var services = builder.Services; + + + + builder.AddWorkFlowEngine(workflowContextPrincipalId, + typeof(TWorkflowRun).GetCustomAttribute().CollectionSchemaName, + (ctx,id) => new TWorkflowRun { Id = id }, + configureHangfire, withJobServer); + services.AddScoped>(); + + return builder; + } + + public static IEAVFrameworkBuilder AddWorkFlowEngine( this IEAVFrameworkBuilder builder, string workflowContextPrincipalId, + string workflowRunEntityName, + Func runFactory, Func configureHangfire = null, bool withJobServer=true) where TContext : DynamicContext - where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + { var services = builder.Services; + services.Configure(options => + { + options.WorkflowEntityName = workflowRunEntityName; + options.RunFactory = runFactory; + }); + services.AddExpressionEngine(); - services.AddWorkflowEngine>(); + services.AddWorkflowEngine>(); + services.AddOptions().BindConfiguration("EAVFramework:WorkflowEngine"); builder.Services.AddOptions() @@ -379,7 +420,12 @@ public static IEAVFrameworkBuilder AddWorkFlowEngine( }); if(withJobServer) - services.AddHangfireServer(); + services.AddHangfireServer((sp,options) => + { + options.Queues = new[] { sp.GetRequiredService>().Value?.QueueName ?? "default" }; + + + }); return builder; } diff --git a/src/EAVFW.Extensions.WorkflowEngine/EAVFW.Extensions.WorkflowEngine.csproj b/src/EAVFW.Extensions.WorkflowEngine/EAVFW.Extensions.WorkflowEngine.csproj index 5dd94cc..66a13df 100644 --- a/src/EAVFW.Extensions.WorkflowEngine/EAVFW.Extensions.WorkflowEngine.csproj +++ b/src/EAVFW.Extensions.WorkflowEngine/EAVFW.Extensions.WorkflowEngine.csproj @@ -36,9 +36,9 @@ - + - + diff --git a/src/EAVFW.Extensions.WorkflowEngine/EAVFWOutputsRepository.cs b/src/EAVFW.Extensions.WorkflowEngine/EAVFWOutputsRepository.cs index 80102e6..7a5ea8c 100644 --- a/src/EAVFW.Extensions.WorkflowEngine/EAVFWOutputsRepository.cs +++ b/src/EAVFW.Extensions.WorkflowEngine/EAVFWOutputsRepository.cs @@ -58,12 +58,64 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s } } - public class EAVFWOutputsRepository : IOutputsRepository, IDisposable + public interface IEAVFWOutputsRepositoryContextFactory + { + + Task SaveAsync(Guid runId, byte[] bytes, ClaimsPrincipal principal); + Task LoadAsync(Guid runId); + } + public class DefaultEAVFWOutputsRepositoryContextFactory : IEAVFWOutputsRepositoryContextFactory, IDisposable where TContext : DynamicContext - where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + where TWorkflowRun : DynamicEntity, IWorkflowRun, new() { private readonly IServiceScope _scope; - private readonly EAVDBContext _eAVDBContext; + private readonly EAVDBContext _db; + + public DefaultEAVFWOutputsRepositoryContextFactory(IServiceScopeFactory scopeFactory) + { + _scope = scopeFactory.CreateScope(); + _db = _scope.ServiceProvider.GetRequiredService>(); + } + + + + public void Dispose() + { + _scope.Dispose(); + } + + public async Task LoadAsync(Guid runId) + { + + var run = await _db.Set().FindAsync(runId); + return run?.State; + } + + public async Task SaveAsync(Guid runId, byte[] bytes, ClaimsPrincipal principal) + { + + var run = await _db.Set().FindAsync(runId); + if (run == null) + { + run = new TWorkflowRun { Id = runId, State = bytes }; + _db.Set().Add(run); + } + else + { + run.State = bytes; + _db.Set().Update(run); + } + await _db.SaveChangesAsync(principal); + } + } + + + public class EAVFWOutputsRepository : IOutputsRepository + where TContext : DynamicContext + // where TWorkflowRun : DynamicEntity, IWorkflowRun, new() + { + // private readonly IServiceScope _scope; + // private readonly EAVDBContext _eAVDBContext; private JsonSerializerSettings CreateSerializerSettings() { var settings = JsonConvert.DefaultSettings?.Invoke() ?? new JsonSerializerSettings @@ -80,18 +132,20 @@ private JsonSerializerSettings CreateSerializerSettings() private JsonSerializerSettings _serializerSettings; private JsonSerializer _serializer; + private readonly IEAVFWOutputsRepositoryContextFactory _factory; protected ClaimsPrincipal Principal { get; } - public EAVFWOutputsRepository(IServiceScopeFactory scopeFactory, IOptions options) + public EAVFWOutputsRepository(IEAVFWOutputsRepositoryContextFactory factory , IOptions options) { - _scope = scopeFactory.CreateScope(); + // _scope = scopeFactory.CreateScope(); _serializerSettings = CreateSerializerSettings(); _serializer = JsonSerializer.Create(_serializerSettings); - _eAVDBContext = _scope.ServiceProvider.GetRequiredService>(); + Principal = new ClaimsPrincipal(new ClaimsIdentity(new Claim[] { new Claim("sub",options.Value.IdenttyId) }, "eavfw")); + _factory = factory; } public async ValueTask AddScopeItem(IRunContext context, IWorkflow workflow, IAction action, IActionResult result) { @@ -155,19 +209,20 @@ private async Task SaveState(Guid runId, WorkflowState state) writer.Flush(); tinyStream.Flush(); - var run = await _eAVDBContext.Set().FindAsync(runId); - - run.State = ms.ToArray(); - - _eAVDBContext.Set().Update(run); - await _eAVDBContext.SaveChangesAsync(Principal); + + await _factory.SaveAsync(runId, ms.ToArray(), Principal); + + } public async Task GetState(Guid runId) { - var run = await _eAVDBContext.Set().FindAsync(runId); + + + var data = await _factory.LoadAsync(runId); + using var tinyStream = new JsonTextReader( - new StreamReader(new GZipStream(new MemoryStream(run.State), CompressionMode.Decompress))); + new StreamReader(new GZipStream(new MemoryStream(data), CompressionMode.Decompress))); return _serializer.Deserialize(tinyStream); @@ -175,29 +230,31 @@ public async Task GetState(Guid runId) private async Task GetOrCreateRun(IRunContext context) { - var run = await _eAVDBContext.Set().FindAsync(context.RunId); - if (run == null) - { - var (data, dataarr) = CreateState(); + - run = new TWorkflowRun() { Id = context.RunId, State = dataarr }; - _eAVDBContext.Set().Add(run); - await _eAVDBContext.SaveChangesAsync(Principal); + var data = await _factory.LoadAsync(context.RunId); - return data; + if (data == null) + { + var (_data, dataarr) = CreateState(); + + await _factory.SaveAsync(context.RunId, dataarr, Principal); + + return _data; } - if (run.State == null) + if (data == null) { - var (data, dataarr) = CreateState(); - run.State = dataarr; - await _eAVDBContext.SaveChangesAsync(Principal); - return data; + var (_data, dataarr) = CreateState(); + + await _factory.SaveAsync(context.RunId, dataarr, Principal); + + return _data; } { - using var tinyStream = new JsonTextReader(new StreamReader(new GZipStream(new MemoryStream(run.State), CompressionMode.Decompress))); + using var tinyStream = new JsonTextReader(new StreamReader(new GZipStream(new MemoryStream(data), CompressionMode.Decompress))); return _serializer.Deserialize(tinyStream); @@ -360,9 +417,6 @@ public async ValueTask AddInput(IRunContext context, IWorkflow workflow, IAction await SaveState(context.RunId, run); } - public void Dispose() - { - _scope.Dispose(); - } + } } diff --git a/src/EAVFW.Extensions.WorkflowEngine/Endpoints/WorkflowEndpointOptions.cs b/src/EAVFW.Extensions.WorkflowEngine/Endpoints/WorkflowEndpointOptions.cs index 3ab8e01..dd05cd5 100644 --- a/src/EAVFW.Extensions.WorkflowEngine/Endpoints/WorkflowEndpointOptions.cs +++ b/src/EAVFW.Extensions.WorkflowEngine/Endpoints/WorkflowEndpointOptions.cs @@ -1,3 +1,7 @@ +using EAVFramework.Endpoints; +using System; +using System.Threading.Tasks; + namespace EAVFW.Extensions.WorkflowEngine.Endpoints { public class WorkflowEndpointOptions @@ -6,5 +10,10 @@ public class WorkflowEndpointOptions public bool IncludeStartWorkflow { get; set; } = true; public bool IncludeWorkflowState { get; set; } = true; public bool IncludeWorkflowMetadata { get; set; } = true; + + public string QueueName { get; set; } = "default"; + + public Func RunFactory { get; set; } + public string WorkflowEntityName { get; set; } } }