Skip to content

Commit

Permalink
fix: refactored abit to allowoutput respository to run without known …
Browse files Browse the repository at this point in the history
…type
  • Loading branch information
pksorensen committed Aug 18, 2024
1 parent 6660990 commit cbe6d2e
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using System.IO.Compression;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security.Claims;
using System.Threading.Tasks;
using WorkflowEngine;
Expand All @@ -47,10 +48,22 @@ public static ClaimsPrincipal GetRunningPrincipal(this IRunContext context)
}
public static class DependencyInjectionExtensions
{
public static IEndpointRouteBuilder MapWorkFlowEndpoints<TContext, TWorkflowRun>(
public static IEndpointRouteBuilder MapWorkFlowEndpoints<TContext,TWorkflowRun>(
this IEndpointRouteBuilder endpoints)
where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{

var options = endpoints.ServiceProvider.GetRequiredService<IOptions<WorkflowEndpointOptions>>();
options.Value.RunFactory = (context,id) => new TWorkflowRun { Id = id } ;
options.Value.WorkflowEntityName = typeof(TWorkflowRun).GetCustomAttribute<EntityAttribute>().CollectionSchemaName;
return endpoints.MapWorkFlowEndpoints<TContext>();

}
public static IEndpointRouteBuilder MapWorkFlowEndpoints<TContext>(
this IEndpointRouteBuilder endpoints)
where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
// where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{
var options = endpoints.ServiceProvider.GetRequiredService<IOptions<WorkflowEndpointOptions>>();

Expand Down Expand Up @@ -158,12 +171,13 @@ await JToken.ReadFromAsync(
return;
}
var context = httpContext.RequestServices.GetRequiredService<EAVDBContext<TContext>>();
context.Add(new TWorkflowRun() { Id = trigger.RunId });
// var run = Activator.CreateInstance(context.Context.get)
context.Add( options.Value.RunFactory(context,trigger.RunId));
await context.SaveChangesAsync(httpContext.User);
var backgroundJobClient = httpContext.RequestServices.GetRequiredService<IBackgroundJobClient>();
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(options.Value.QueueName,
executor => executor.TriggerAsync(trigger,null));
Expand Down Expand Up @@ -218,11 +232,11 @@ await JToken.ReadFromAsync(
var trigger = await BuildTrigger(workflows, workflowname, httpcontext.User?.FindFirstValue("sub"), inputs);
context.Add(new TWorkflowRun() { Id=trigger.RunId });
context.Add(options.Value.RunFactory(context,trigger.RunId));
await context.SaveChangesAsync(httpcontext.User);
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(options.Value.QueueName,
(executor) => executor.TriggerAsync(trigger,null));
await httpcontext.Response.WriteJsonAsync(new { id = trigger.RunId, job = job });
Expand All @@ -232,25 +246,25 @@ await JToken.ReadFromAsync(
if (options.Value.IncludeWorkflowState)
{
endpoints.MapGet("/api/workflowruns/{workflowRunId}/status", async context =>
await ApiWorkflowsEndpoint<TContext, TWorkflowRun>(context, true));
await ApiWorkflowsEndpoint<TContext>(context, options, true));

endpoints.MapGet("/api/workflowruns/{workflowRunId}",
async context => await ApiWorkflowsEndpoint<TContext, TWorkflowRun>(context));
async context => await ApiWorkflowsEndpoint<TContext>(context,options));

endpoints.MapGet("/api/workflows/{workflowId}/runs/{workflowRunId}",
async context => await ApiWorkflowsEndpoint<TContext, TWorkflowRun>(context));
async context => await ApiWorkflowsEndpoint<TContext>(context, options));

endpoints.MapGet("/api/workflows/{workflowId}/runs/{workflowRunId}/status",
async context => await ApiWorkflowsEndpoint<TContext, TWorkflowRun>(context,true));
async context => await ApiWorkflowsEndpoint<TContext>(context,options,true));


}

return endpoints;
}

private static async Task ApiWorkflowsEndpoint<TContext, TWorkflowRun>(HttpContext context, bool statusOnly = false) where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun
private static async Task ApiWorkflowsEndpoint<TContext>(HttpContext context, IOptions<WorkflowEndpointOptions> options, bool statusOnly = false) where TContext : DynamicContext
// where TWorkflowRun : DynamicEntity, IWorkflowRun
{
var routeJobId = context.GetRouteValue("workflowRunId") as string;
if (!Guid.TryParse(routeJobId, out var jobId))
Expand All @@ -262,8 +276,8 @@ private static async Task ApiWorkflowsEndpoint<TContext, TWorkflowRun>(HttpConte

var db = context.RequestServices.GetRequiredService<EAVDBContext<TContext>>();

var workflowRun = await db.Set<TWorkflowRun>().FindAsync(jobId);

var workflowRunEntry = await db.FindAsync(options.Value.WorkflowEntityName?? "WorkflowRuns",jobId);
var workflowRun = workflowRunEntry?.Entity as IWorkflowRun;
if (workflowRun == null)
{
await new NotFoundResult().ExecuteAsync(context);
Expand Down Expand Up @@ -332,21 +346,48 @@ private static Task<WorkflowState> GetState(IWorkflowRun run)
NullValueHandling= NullValueHandling.Ignore });
return Task.FromResult(serializer.Deserialize<WorkflowState>(tinyStream));
}




public static IEAVFrameworkBuilder AddWorkFlowEngine<TContext, TWorkflowRun>(
this IEAVFrameworkBuilder builder,
string workflowContextPrincipalId,
Func<IServiceProvider, IGlobalConfiguration, IGlobalConfiguration> configureHangfire = null, bool withJobServer = true)
where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{
var services = builder.Services;



builder.AddWorkFlowEngine<TContext>(workflowContextPrincipalId,
typeof(TWorkflowRun).GetCustomAttribute<EntityAttribute>().CollectionSchemaName,
(ctx,id) => new TWorkflowRun { Id = id },
configureHangfire, withJobServer);
services.AddScoped<IEAVFWOutputsRepositoryContextFactory, DefaultEAVFWOutputsRepositoryContextFactory<TContext, TWorkflowRun>>();

return builder;
}

public static IEAVFrameworkBuilder AddWorkFlowEngine<TContext>(
this IEAVFrameworkBuilder builder,
string workflowContextPrincipalId,
string workflowRunEntityName,
Func<EAVDBContext, Guid, IWorkflowRun> runFactory,
Func<IServiceProvider,IGlobalConfiguration, IGlobalConfiguration> configureHangfire = null, bool withJobServer=true)
where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{
var services = builder.Services;

services.Configure<WorkflowEndpointOptions>(options =>
{
options.WorkflowEntityName = workflowRunEntityName;
options.RunFactory = runFactory;
});

services.AddExpressionEngine();
services.AddWorkflowEngine<EAVFWOutputsRepository<TContext,TWorkflowRun>>();
services.AddWorkflowEngine<EAVFWOutputsRepository<TContext>>();

services.AddOptions<WorkflowEndpointOptions>().BindConfiguration("EAVFramework:WorkflowEngine");

builder.Services.AddOptions<EAVFWOutputsRepositoryOptions>()
Expand Down Expand Up @@ -379,7 +420,12 @@ public static IEAVFrameworkBuilder AddWorkFlowEngine<TContext, TWorkflowRun>(
});

if(withJobServer)
services.AddHangfireServer();
services.AddHangfireServer((sp,options) =>
{
options.Queues = new[] { sp.GetRequiredService<IOptions<WorkflowEndpointOptions>>().Value?.QueueName ?? "default" };
});

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
<ProjectReference Include="$(LocalEAVFrameworkPath)\src\EAVFramework.csproj" />
</ItemGroup>
<ItemGroup Condition="$(UseEAVFromNuget) != 'false'">
<PackageReference Include="Delegate.WorkflowEngine.Hangfire" Version="3.2.2" />
<PackageReference Include="Delegate.WorkflowEngine.Hangfire" Version="3.2.3" />
<PackageReference Include="EAVFW.Extensions.Documents" Version="3.1.0" />
<PackageReference Include="EAVFramework" Version="$(EAVFrameworkVersion)" />
<PackageReference Include="EAVFW.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="EAVFW.Extensions.Configuration" Version="2.2.1" />
</ItemGroup>
</Project>
118 changes: 86 additions & 32 deletions src/EAVFW.Extensions.WorkflowEngine/EAVFWOutputsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,64 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s
}
}

public class EAVFWOutputsRepository<TContext, TWorkflowRun> : IOutputsRepository, IDisposable
public interface IEAVFWOutputsRepositoryContextFactory
{

Task SaveAsync(Guid runId, byte[] bytes, ClaimsPrincipal principal);
Task<byte[]> LoadAsync(Guid runId);
}
public class DefaultEAVFWOutputsRepositoryContextFactory<TContext, TWorkflowRun> : IEAVFWOutputsRepositoryContextFactory, IDisposable
where TContext : DynamicContext
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{
private readonly IServiceScope _scope;
private readonly EAVDBContext<TContext> _eAVDBContext;
private readonly EAVDBContext<TContext> _db;

public DefaultEAVFWOutputsRepositoryContextFactory(IServiceScopeFactory scopeFactory)
{
_scope = scopeFactory.CreateScope();
_db = _scope.ServiceProvider.GetRequiredService<EAVDBContext<TContext>>();
}



public void Dispose()
{
_scope.Dispose();
}

public async Task<byte[]> LoadAsync(Guid runId)
{

var run = await _db.Set<TWorkflowRun>().FindAsync(runId);
return run?.State;
}

public async Task SaveAsync(Guid runId, byte[] bytes, ClaimsPrincipal principal)
{

var run = await _db.Set<TWorkflowRun>().FindAsync(runId);
if (run == null)
{
run = new TWorkflowRun { Id = runId, State = bytes };
_db.Set<TWorkflowRun>().Add(run);
}
else
{
run.State = bytes;
_db.Set<TWorkflowRun>().Update(run);
}
await _db.SaveChangesAsync(principal);
}
}


public class EAVFWOutputsRepository<TContext> : IOutputsRepository
where TContext : DynamicContext
// where TWorkflowRun : DynamicEntity, IWorkflowRun, new()
{
// private readonly IServiceScope _scope;
// private readonly EAVDBContext<TContext> _eAVDBContext;
private JsonSerializerSettings CreateSerializerSettings()
{
var settings = JsonConvert.DefaultSettings?.Invoke() ?? new JsonSerializerSettings
Expand All @@ -80,18 +132,20 @@ private JsonSerializerSettings CreateSerializerSettings()

private JsonSerializerSettings _serializerSettings;
private JsonSerializer _serializer;
private readonly IEAVFWOutputsRepositoryContextFactory _factory;

protected ClaimsPrincipal Principal { get; }

public EAVFWOutputsRepository(IServiceScopeFactory scopeFactory, IOptions<EAVFWOutputsRepositoryOptions> options)
public EAVFWOutputsRepository(IEAVFWOutputsRepositoryContextFactory factory , IOptions<EAVFWOutputsRepositoryOptions> options)
{
_scope = scopeFactory.CreateScope();
// _scope = scopeFactory.CreateScope();
_serializerSettings = CreateSerializerSettings();
_serializer = JsonSerializer.Create(_serializerSettings);
_eAVDBContext = _scope.ServiceProvider.GetRequiredService<EAVDBContext<TContext>>();

Principal = new ClaimsPrincipal(new ClaimsIdentity(new Claim[] {
new Claim("sub",options.Value.IdenttyId)
}, "eavfw"));
_factory = factory;
}
public async ValueTask AddScopeItem(IRunContext context, IWorkflow workflow, IAction action, IActionResult result)
{
Expand Down Expand Up @@ -155,49 +209,52 @@ private async Task SaveState(Guid runId, WorkflowState state)
writer.Flush();
tinyStream.Flush();

var run = await _eAVDBContext.Set<TWorkflowRun>().FindAsync(runId);

run.State = ms.ToArray();

_eAVDBContext.Set<TWorkflowRun>().Update(run);
await _eAVDBContext.SaveChangesAsync(Principal);

await _factory.SaveAsync(runId, ms.ToArray(), Principal);


}

public async Task<WorkflowState> GetState(Guid runId)
{
var run = await _eAVDBContext.Set<TWorkflowRun>().FindAsync(runId);


var data = await _factory.LoadAsync(runId);

using var tinyStream = new JsonTextReader(
new StreamReader(new GZipStream(new MemoryStream(run.State), CompressionMode.Decompress)));
new StreamReader(new GZipStream(new MemoryStream(data), CompressionMode.Decompress)));


return _serializer.Deserialize<WorkflowState>(tinyStream);
}

private async Task<WorkflowState> GetOrCreateRun(IRunContext context)
{
var run = await _eAVDBContext.Set<TWorkflowRun>().FindAsync(context.RunId);
if (run == null)
{
var (data, dataarr) = CreateState();


run = new TWorkflowRun() { Id = context.RunId, State = dataarr };
_eAVDBContext.Set<TWorkflowRun>().Add(run);
await _eAVDBContext.SaveChangesAsync(Principal);
var data = await _factory.LoadAsync(context.RunId);

return data;
if (data == null)
{
var (_data, dataarr) = CreateState();

await _factory.SaveAsync(context.RunId, dataarr, Principal);

return _data;
}

if (run.State == null)
if (data == null)
{
var (data, dataarr) = CreateState();
run.State = dataarr;
await _eAVDBContext.SaveChangesAsync(Principal);
return data;
var (_data, dataarr) = CreateState();

await _factory.SaveAsync(context.RunId, dataarr, Principal);

return _data;
}

{

using var tinyStream = new JsonTextReader(new StreamReader(new GZipStream(new MemoryStream(run.State), CompressionMode.Decompress)));
using var tinyStream = new JsonTextReader(new StreamReader(new GZipStream(new MemoryStream(data), CompressionMode.Decompress)));


return _serializer.Deserialize<WorkflowState>(tinyStream);
Expand Down Expand Up @@ -360,9 +417,6 @@ public async ValueTask AddInput(IRunContext context, IWorkflow workflow, IAction
await SaveState(context.RunId, run);
}

public void Dispose()
{
_scope.Dispose();
}

}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
using EAVFramework.Endpoints;
using System;
using System.Threading.Tasks;

namespace EAVFW.Extensions.WorkflowEngine.Endpoints
{
public class WorkflowEndpointOptions
Expand All @@ -6,5 +10,10 @@ public class WorkflowEndpointOptions
public bool IncludeStartWorkflow { get; set; } = true;
public bool IncludeWorkflowState { get; set; } = true;
public bool IncludeWorkflowMetadata { get; set; } = true;

public string QueueName { get; set; } = "default";

public Func<EAVDBContext,Guid, IWorkflowRun> RunFactory { get; set; }
public string WorkflowEntityName { get; set; }
}
}

0 comments on commit cbe6d2e

Please sign in to comment.