Skip to content

Commit

Permalink
Header args for command scheduling, caching, & timer refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
aritchie committed Jan 25, 2025
1 parent f6a62e6 commit 53481d2
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 27 deletions.
5 changes: 0 additions & 5 deletions src/Shiny.Mediator/Caching/CacheExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ namespace Shiny.Mediator;

public static class CacheExtensions
{
public static readonly (string Key, bool Value) ForceCacheRefreshHeader = ("ForceCacheRefresh", true);

public static bool HasForceCacheRefresh(this RequestContext context)
=> context.Values.ContainsKey(ForceCacheRefreshHeader.Key);

public static ShinyConfigurator AddCaching<TCache>(this ShinyConfigurator cfg) where TCache : class, ICacheService
{
cfg.Services.AddSingletonAsImplementedInterfaces<TCache>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CancellationToken cancellationToken
};
}

var config = this.GetItemConfig(attribute, context.Request);
var config = this.GetItemConfig(context, attribute, context.Request);
if (config == null)
return await next().ConfigureAwait(false);

Expand Down Expand Up @@ -71,8 +71,12 @@ CancellationToken cancellationToken
}


protected virtual CacheItemConfig? GetItemConfig(CacheAttribute? attribute, TRequest request)
protected virtual CacheItemConfig? GetItemConfig(RequestContext context, CacheAttribute? attribute, TRequest request)
{
var cache = context.TryGetCacheConfig();
if (cache != null)
return cache;

if (request is ICacheControl control)
{
return new CacheItemConfig(
Expand Down
39 changes: 39 additions & 0 deletions src/Shiny.Mediator/Headers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Shiny.Mediator.Infrastructure;

namespace Shiny.Mediator;

public static class Headers
{
const string TimerRefreshHeader = nameof(TimerRefreshHeader);
public static (string Key, int Value) TimerRefresh(int timerRefreshSeconds)
=> (TimerRefreshHeader, timerRefreshSeconds);

public static int? TryGetTimerRefresh(this RequestContext context)
=> context.TryGetValue<int>(TimerRefreshHeader);


const string CommandScheduleHeader = nameof(CommandScheduleHeader);
public static (string Key, object Value) SetCommandSchedule(DateTimeOffset dueAt)
=> (CommandScheduleHeader, dueAt);

public static DateTimeOffset? TryGetCommandSchedule(this CommandContext context)
=> context.TryGetValue<DateTimeOffset>(CommandScheduleHeader);

#region Caching

const string CacheConfigHeader = nameof(CacheConfigHeader);
public static CacheItemConfig? TryGetCacheConfig(this RequestContext context)
=> context.TryGetValue<CacheItemConfig>(CacheConfigHeader);

public static (string Key, object Value) SetCacheConfig(this RequestContext context, CacheItemConfig cfg)
=> (CacheConfigHeader, cfg);


const string ForceCacheRefreshHeader = nameof(ForceCacheRefreshHeader);
public static (string Key, bool Value) ForceCacheRefresh { get; } = (ForceCacheRefreshHeader, true);

public static bool HasForceCacheRefresh(this RequestContext context)
=> context.Values.ContainsKey(ForceCacheRefreshHeader);

#endregion
}
2 changes: 2 additions & 0 deletions src/Shiny.Mediator/Infrastructure/ICommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ public interface ICommandScheduler
/// Schedules and executes command
/// </summary>
/// <param name="context">The context containing the headers and contract</param>
/// <param name="dueAt">The schedule date</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<bool> Schedule(
CommandContext context,
DateTimeOffset dueAt,
CancellationToken cancellationToken
);
}
20 changes: 9 additions & 11 deletions src/Shiny.Mediator/Infrastructure/Impl/InMemoryCommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ public class InMemoryCommandScheduler(
TimeProvider timeProvider
) : ICommandScheduler
{
readonly List<CommandContext> commands = new();
readonly List<(DateTimeOffset DueAt, CommandContext Context)> commands = new();
ITimer? timer;


public Task<bool> Schedule(CommandContext command, CancellationToken cancellationToken)
public Task<bool> Schedule(CommandContext command, DateTimeOffset dueAt, CancellationToken cancellationToken)
{
var scheduled = false;
if (command.Command is not IScheduledCommand scheduledCommand)
throw new InvalidCastException($"Command {command.Command} is not of IScheduledCommand");

var now = timeProvider.GetUtcNow();
if (scheduledCommand.DueAt > now)

if (dueAt > now)
{
lock (this.commands)
this.commands.Add(command);
this.commands.Add((dueAt, command));

scheduled = true;
this.timer ??= timeProvider.CreateTimer(_ => this.OnTimerElapsed(), null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
Expand All @@ -36,25 +34,25 @@ protected virtual async void OnTimerElapsed()
{
this.timer!.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); // stop

List<CommandContext> items = null!;
List<(DateTimeOffset DueAt, CommandContext Context)> items = null!;
lock (this.commands)
items = this.commands.ToList();

foreach (var item in items)
{
var command = (IScheduledCommand)item.Command;
var time = timeProvider.GetUtcNow();
if (command.DueAt < time)
if (item.DueAt < time)
{
var headers = item
.Context
.Values
.Select(x => (Key: x.Key, Value: x.Value))
.ToList();

try
{
await mediator
.Send(command, CancellationToken.None, headers)
.Send(item.Context.Command, CancellationToken.None, headers)
.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
7 changes: 4 additions & 3 deletions src/Shiny.Mediator/Middleware/ScheduledCommandMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ public class ScheduledCommandMiddleware<TCommand>(
ILogger<ScheduledCommandMiddleware<TCommand>> logger,
TimeProvider timeProvider,
ICommandScheduler scheduler
) : ICommandMiddleware<TCommand> where TCommand : IScheduledCommand
) : ICommandMiddleware<TCommand> where TCommand : ICommand
{
public async Task Process(
CommandContext<TCommand> context,
CommandHandlerDelegate next,
CancellationToken cancellationToken
)
{
var dueAt = context.TryGetCommandSchedule() ?? (context.Command as IScheduledCommand)?.DueAt;
var now = timeProvider.GetUtcNow();
if (context.Command.DueAt < now)
if (dueAt == null || dueAt < now)
{
logger.LogWarning($"Executing Scheduled Command '{context.Command}' that was due at {context.Command.DueAt}");
await next().ConfigureAwait(false);
}
else
{
logger.LogInformation($"Command '{context.Command}' scheduled for {context.Command.DueAt}");
logger.LogInformation($"Command '{context.Command}' scheduled for {dueAt}");
await scheduler
.Schedule(context, cancellationToken)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ CancellationToken cancellationToken
)
{
var interval = 0;
var section = configuration.GetHandlerSection("TimerRefresh", context.Request, context.RequestHandler);
if (section != null)

var header = context.TryGetTimerRefresh();
if (header != null)
{
interval = section.GetValue("IntervalSeconds", 0);
interval = header.Value;
}
else
{
var attribute = context.RequestHandler.GetHandlerHandleMethodAttribute<TRequest, TimerRefreshAttribute>();
if (attribute != null)
interval = attribute.IntervalSeconds;
var section = configuration.GetHandlerSection("TimerRefresh", context.Request, context.RequestHandler);
if (section != null)
{
interval = section.GetValue("IntervalSeconds", 0);
}
else
{
var attribute = context.RequestHandler.GetHandlerHandleMethodAttribute<TRequest, TimerRefreshAttribute>();
if (attribute != null)
interval = attribute.IntervalSeconds;
}
}

if (interval <= 0)
Expand Down

0 comments on commit 53481d2

Please sign in to comment.