Skip to content

Commit

Permalink
Cancel all pending events handling and propagate cancellation to all …
Browse files Browse the repository at this point in the history
…currently executing handlers when IEventBroker.Shoudown() (#6)
  • Loading branch information
petar-m authored Jan 6, 2024
1 parent dae7480 commit cd8409f
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 51 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public class SomeEventHandler : IEventHandler<SomeEvent>
{
}

public async Task Handle(SomeEvent @event)
public async Task Handle(SomeEvent @event, CancellationToken cancellationToken)
{
// process the event
}

public async Task OnError(Exception exception, SomeEvent @event)
public async Task OnError(Exception exception, SomeEvent @event, CancellationToken cancellationToken)
{
// called on unhandled exeption from Handle
}
Expand Down
4 changes: 2 additions & 2 deletions package-readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public class SomeEventHandler : IEventHandler<SomeEvent>
{
}

public async Task Handle(SomeEvent @event)
public async Task Handle(SomeEvent @event, CancellationToken cancellationToken)
{
// process the event
}

public Task OnError(Exception exception, SomeEvent @event)
public Task OnError(Exception exception, SomeEvent @event, CancellationToken cancellationToken)
{
// called on unhandled exeption from Handle
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ internal void CreateEventHandlerDescriptor<TEvent, THandler>(Guid eventHandlerKe
Key: eventHandlerKey,
EventType: typeof(TEvent),
InterfaceType: typeof(IEventHandler<TEvent>),
Handle: async (handler, @event) => await ((THandler)handler).Handle((TEvent)@event),
OnError: async (handler, @event, exception) => await ((THandler)handler).OnError(exception, (TEvent)@event));
Handle: async (handler, @event, ct) => await ((THandler)handler).Handle((TEvent)@event, ct),
OnError: async (handler, @event, exception, ct) => await ((THandler)handler).OnError(exception, (TEvent)@event, ct));

_eventsHandlersDescriptors.Add(descriptor);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using M.EventBrokerSlim.Internal;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -26,10 +27,14 @@ public static IServiceCollection AddEventBroker(

serviceCollection.AddSingleton<EventHandlerRegistryBuilder>(eventHandlerRegistryBuilder);

var channelKey = Guid.NewGuid();
var eventBrokerKey = Guid.NewGuid();

CancellationTokenSource eventBrokerCancellationTokenSource = new();

serviceCollection.AddKeyedSingleton(eventBrokerKey, eventBrokerCancellationTokenSource);

serviceCollection.AddKeyedSingleton(
channelKey,
eventBrokerKey,
(_, _) => Channel.CreateUnbounded<object>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
Expand All @@ -42,14 +47,17 @@ public static IServiceCollection AddEventBroker(
{
var eventHandlerRunner = x.GetRequiredService<ThreadPoolEventHandlerRunner>();
eventHandlerRunner.Run();
return new EventBroker(x.GetRequiredKeyedService<Channel<object>>(channelKey).Writer);
return new EventBroker(
x.GetRequiredKeyedService<Channel<object>>(eventBrokerKey).Writer,
x.GetRequiredKeyedService<CancellationTokenSource>(eventBrokerKey));
});

serviceCollection.AddSingleton(
x => new ThreadPoolEventHandlerRunner(
x.GetRequiredKeyedService<Channel<object>>(channelKey).Reader,
x.GetRequiredKeyedService<Channel<object>>(eventBrokerKey).Reader,
x.GetRequiredService<IServiceScopeFactory>(),
x.GetRequiredService<EventHandlerRegistry>(),
x.GetRequiredKeyedService<CancellationTokenSource>(eventBrokerKey),
x.GetService<ILogger<ThreadPoolEventHandlerRunner>>()));

serviceCollection.AddSingleton(
Expand Down
7 changes: 5 additions & 2 deletions src/M.EventBrokerSlim/IEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -15,7 +16,8 @@ public interface IEventHandler<TEvent>
/// Handles the event.
/// </summary>
/// <param name="event">An instance of <typeparamref name="TEvent"/> representing the event.</param>
Task Handle(TEvent @event);
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task Handle(TEvent @event, CancellationToken cancellationToken);

/// <summary>
/// Called when an unhadled exception is caught during execution.
Expand All @@ -24,5 +26,6 @@ public interface IEventHandler<TEvent>
/// </summary>
/// <param name="exception">The exception caught.</param>
/// <param name="event">The event instance which handling caused the exception.</param>
Task OnError(Exception exception, TEvent @event);
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task OnError(Exception exception, TEvent @event, CancellationToken cancellationToken);
}
16 changes: 12 additions & 4 deletions src/M.EventBrokerSlim/Internal/EventBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ namespace M.EventBrokerSlim.Internal;
internal sealed class EventBroker : IEventBroker
{
private readonly ChannelWriter<object> _channelWriter;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly CancellationToken _cancellationToken;

public EventBroker(ChannelWriter<object> channelWriter)
public EventBroker(ChannelWriter<object> channelWriter, CancellationTokenSource cancellationTokenSource)
{
_channelWriter = channelWriter;
_cancellationTokenSource = cancellationTokenSource;
_cancellationToken = _cancellationTokenSource.Token;
}

public async Task Publish<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : notnull
Expand All @@ -37,8 +41,8 @@ public Task PublishDeferred<TEvent>(TEvent @event, TimeSpan deferDuration) where
{
try
{
await Task.Delay(deferDuration).ConfigureAwait(false);
await _channelWriter.WriteAsync(@event).ConfigureAwait(false);
await Task.Delay(deferDuration, _cancellationToken).ConfigureAwait(false);
await _channelWriter.WriteAsync(@event, _cancellationToken).ConfigureAwait(false);
}
catch
{
Expand All @@ -49,5 +53,9 @@ public Task PublishDeferred<TEvent>(TEvent @event, TimeSpan deferDuration) where
return Task.CompletedTask;
}

public void Shutdown() => _ = _channelWriter.TryComplete();
public void Shutdown()
{
_ = _channelWriter.TryComplete();
_cancellationTokenSource.Cancel();
}
}
5 changes: 3 additions & 2 deletions src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace M.EventBrokerSlim.Internal;
Expand All @@ -7,5 +8,5 @@ internal sealed record EventHandlerDescriptor(
Guid Key,
Type EventType,
Type InterfaceType,
Func<object, object, Task> Handle,
Func<object, object, Exception, Task> OnError);
Func<object, object, CancellationToken, Task> Handle,
Func<object, object, Exception, CancellationToken, Task> OnError);
18 changes: 11 additions & 7 deletions src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ internal sealed class ThreadPoolEventHandlerRunner
private readonly ChannelReader<object> _channelReader;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly EventHandlerRegistry _eventHandlerRegistry;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ILogger<ThreadPoolEventHandlerRunner>? _logger;
private readonly SemaphoreSlim _semaphore;

internal ThreadPoolEventHandlerRunner(
ChannelReader<object> channelReader,
IServiceScopeFactory serviceScopeFactory,
ChannelReader<object> channelReader,
IServiceScopeFactory serviceScopeFactory,
EventHandlerRegistry eventHandlerRegistry,
CancellationTokenSource cancellationTokenSource,
ILogger<ThreadPoolEventHandlerRunner>? logger)
{
_channelReader = channelReader;
_serviceScopeFactory = serviceScopeFactory;
_eventHandlerRegistry = eventHandlerRegistry;
_cancellationTokenSource = cancellationTokenSource;
_logger = logger;
_semaphore = new SemaphoreSlim(_eventHandlerRegistry.MaxConcurrentHandlers, _eventHandlerRegistry.MaxConcurrentHandlers);
}
Expand All @@ -35,7 +38,8 @@ public void Run()

private async ValueTask ProcessEvents()
{
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
CancellationToken token = _cancellationTokenSource.Token;
while (await _channelReader.WaitToReadAsync(token).ConfigureAwait(false))
{
while (_channelReader.TryRead(out var @event))
{
Expand All @@ -54,7 +58,7 @@ private async ValueTask ProcessEvents()

for (int i = 0; i < eventHandlers.Count; i++)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
await _semaphore.WaitAsync(token).ConfigureAwait(false);

var eventHandlerDescriptior = eventHandlers[i];

Expand All @@ -65,7 +69,7 @@ private async ValueTask ProcessEvents()
try
{
service = scope.ServiceProvider.GetRequiredKeyedService(eventHandlerDescriptior.InterfaceType, eventHandlerDescriptior.Key);
await eventHandlerDescriptior.Handle(service, @event).ConfigureAwait(false);
await eventHandlerDescriptior.Handle(service, @event, token).ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -77,9 +81,9 @@ private async ValueTask ProcessEvents()

try
{
await eventHandlerDescriptior.OnError(service, @event, exception).ConfigureAwait(false);
await eventHandlerDescriptior.OnError(service, @event, exception, token).ConfigureAwait(false);
}
catch(Exception errorHandlingException)
catch (Exception errorHandlingException)
{
// suppress further exeptions
_logger?.LogUnhandledExceptionFromOnError(service.GetType(), errorHandlingException);
Expand Down
Loading

0 comments on commit cd8409f

Please sign in to comment.