diff --git a/README.md b/README.md index 4234c2a..dd581d4 100644 --- a/README.md +++ b/README.md @@ -34,12 +34,12 @@ public class SomeEventHandler : IEventHandler { } - public async Task Handle(SomeEvent @event) + public async Task Handle(SomeEvent @event, CancellationToken cancellationToken) { // process the event } - public async Task OnError(Exception exception, SomeEvent @event) + public async Task OnError(Exception exception, SomeEvent @event, CancellationToken cancellationToken) { // called on unhandled exeption from Handle } diff --git a/package-readme.md b/package-readme.md index 0966fd7..1515744 100644 --- a/package-readme.md +++ b/package-readme.md @@ -23,12 +23,12 @@ public class SomeEventHandler : IEventHandler { } - public async Task Handle(SomeEvent @event) + public async Task Handle(SomeEvent @event, CancellationToken cancellationToken) { // process the event } - public Task OnError(Exception exception, SomeEvent @event) + public Task OnError(Exception exception, SomeEvent @event, CancellationToken cancellationToken) { // called on unhandled exeption from Handle } diff --git a/src/M.EventBrokerSlim/DependencyInjection/EventHandlerRegistryBuilder.cs b/src/M.EventBrokerSlim/DependencyInjection/EventHandlerRegistryBuilder.cs index 1659eab..25c51e6 100644 --- a/src/M.EventBrokerSlim/DependencyInjection/EventHandlerRegistryBuilder.cs +++ b/src/M.EventBrokerSlim/DependencyInjection/EventHandlerRegistryBuilder.cs @@ -73,8 +73,8 @@ internal void CreateEventHandlerDescriptor(Guid eventHandlerKe Key: eventHandlerKey, EventType: typeof(TEvent), InterfaceType: typeof(IEventHandler), - Handle: async (handler, @event) => await ((THandler)handler).Handle((TEvent)@event), - OnError: async (handler, @event, exception) => await ((THandler)handler).OnError(exception, (TEvent)@event)); + Handle: async (handler, @event, ct) => await ((THandler)handler).Handle((TEvent)@event, ct), + OnError: async (handler, @event, exception, ct) => await ((THandler)handler).OnError(exception, (TEvent)@event, ct)); _eventsHandlersDescriptors.Add(descriptor); } diff --git a/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs b/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs index be48398..0aa4b29 100644 --- a/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Channels; using M.EventBrokerSlim.Internal; using Microsoft.Extensions.DependencyInjection; @@ -26,10 +27,14 @@ public static IServiceCollection AddEventBroker( serviceCollection.AddSingleton(eventHandlerRegistryBuilder); - var channelKey = Guid.NewGuid(); + var eventBrokerKey = Guid.NewGuid(); + + CancellationTokenSource eventBrokerCancellationTokenSource = new(); + + serviceCollection.AddKeyedSingleton(eventBrokerKey, eventBrokerCancellationTokenSource); serviceCollection.AddKeyedSingleton( - channelKey, + eventBrokerKey, (_, _) => Channel.CreateUnbounded(new UnboundedChannelOptions { AllowSynchronousContinuations = false, @@ -42,14 +47,17 @@ public static IServiceCollection AddEventBroker( { var eventHandlerRunner = x.GetRequiredService(); eventHandlerRunner.Run(); - return new EventBroker(x.GetRequiredKeyedService>(channelKey).Writer); + return new EventBroker( + x.GetRequiredKeyedService>(eventBrokerKey).Writer, + x.GetRequiredKeyedService(eventBrokerKey)); }); serviceCollection.AddSingleton( x => new ThreadPoolEventHandlerRunner( - x.GetRequiredKeyedService>(channelKey).Reader, + x.GetRequiredKeyedService>(eventBrokerKey).Reader, x.GetRequiredService(), x.GetRequiredService(), + x.GetRequiredKeyedService(eventBrokerKey), x.GetService>())); serviceCollection.AddSingleton( diff --git a/src/M.EventBrokerSlim/IEventHandler.cs b/src/M.EventBrokerSlim/IEventHandler.cs index a3027ed..3e7a5a4 100644 --- a/src/M.EventBrokerSlim/IEventHandler.cs +++ b/src/M.EventBrokerSlim/IEventHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -15,7 +16,8 @@ public interface IEventHandler /// Handles the event. /// /// An instance of representing the event. - Task Handle(TEvent @event); + /// A cancellation token that should be used to cancel the work + Task Handle(TEvent @event, CancellationToken cancellationToken); /// /// Called when an unhadled exception is caught during execution. @@ -24,5 +26,6 @@ public interface IEventHandler /// /// The exception caught. /// The event instance which handling caused the exception. - Task OnError(Exception exception, TEvent @event); + /// A cancellation token that should be used to cancel the work + Task OnError(Exception exception, TEvent @event, CancellationToken cancellationToken); } diff --git a/src/M.EventBrokerSlim/Internal/EventBroker.cs b/src/M.EventBrokerSlim/Internal/EventBroker.cs index 4a497f3..eb179aa 100644 --- a/src/M.EventBrokerSlim/Internal/EventBroker.cs +++ b/src/M.EventBrokerSlim/Internal/EventBroker.cs @@ -8,10 +8,14 @@ namespace M.EventBrokerSlim.Internal; internal sealed class EventBroker : IEventBroker { private readonly ChannelWriter _channelWriter; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly CancellationToken _cancellationToken; - public EventBroker(ChannelWriter channelWriter) + public EventBroker(ChannelWriter channelWriter, CancellationTokenSource cancellationTokenSource) { _channelWriter = channelWriter; + _cancellationTokenSource = cancellationTokenSource; + _cancellationToken = _cancellationTokenSource.Token; } public async Task Publish(TEvent @event, CancellationToken cancellationToken = default) where TEvent : notnull @@ -37,8 +41,8 @@ public Task PublishDeferred(TEvent @event, TimeSpan deferDuration) where { try { - await Task.Delay(deferDuration).ConfigureAwait(false); - await _channelWriter.WriteAsync(@event).ConfigureAwait(false); + await Task.Delay(deferDuration, _cancellationToken).ConfigureAwait(false); + await _channelWriter.WriteAsync(@event, _cancellationToken).ConfigureAwait(false); } catch { @@ -49,5 +53,9 @@ public Task PublishDeferred(TEvent @event, TimeSpan deferDuration) where return Task.CompletedTask; } - public void Shutdown() => _ = _channelWriter.TryComplete(); + public void Shutdown() + { + _ = _channelWriter.TryComplete(); + _cancellationTokenSource.Cancel(); + } } diff --git a/src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs b/src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs index a168234..57dd0c9 100644 --- a/src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs +++ b/src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace M.EventBrokerSlim.Internal; @@ -7,5 +8,5 @@ internal sealed record EventHandlerDescriptor( Guid Key, Type EventType, Type InterfaceType, - Func Handle, - Func OnError); + Func Handle, + Func OnError); diff --git a/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs b/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs index 02cbca9..7d6d366 100644 --- a/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs +++ b/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs @@ -12,18 +12,21 @@ internal sealed class ThreadPoolEventHandlerRunner private readonly ChannelReader _channelReader; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly EventHandlerRegistry _eventHandlerRegistry; + private readonly CancellationTokenSource _cancellationTokenSource; private readonly ILogger? _logger; private readonly SemaphoreSlim _semaphore; internal ThreadPoolEventHandlerRunner( - ChannelReader channelReader, - IServiceScopeFactory serviceScopeFactory, + ChannelReader channelReader, + IServiceScopeFactory serviceScopeFactory, EventHandlerRegistry eventHandlerRegistry, + CancellationTokenSource cancellationTokenSource, ILogger? logger) { _channelReader = channelReader; _serviceScopeFactory = serviceScopeFactory; _eventHandlerRegistry = eventHandlerRegistry; + _cancellationTokenSource = cancellationTokenSource; _logger = logger; _semaphore = new SemaphoreSlim(_eventHandlerRegistry.MaxConcurrentHandlers, _eventHandlerRegistry.MaxConcurrentHandlers); } @@ -35,7 +38,8 @@ public void Run() private async ValueTask ProcessEvents() { - while (await _channelReader.WaitToReadAsync().ConfigureAwait(false)) + CancellationToken token = _cancellationTokenSource.Token; + while (await _channelReader.WaitToReadAsync(token).ConfigureAwait(false)) { while (_channelReader.TryRead(out var @event)) { @@ -54,7 +58,7 @@ private async ValueTask ProcessEvents() for (int i = 0; i < eventHandlers.Count; i++) { - await _semaphore.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync(token).ConfigureAwait(false); var eventHandlerDescriptior = eventHandlers[i]; @@ -65,7 +69,7 @@ private async ValueTask ProcessEvents() try { service = scope.ServiceProvider.GetRequiredKeyedService(eventHandlerDescriptior.InterfaceType, eventHandlerDescriptior.Key); - await eventHandlerDescriptior.Handle(service, @event).ConfigureAwait(false); + await eventHandlerDescriptior.Handle(service, @event, token).ConfigureAwait(false); } catch (Exception exception) { @@ -77,9 +81,9 @@ private async ValueTask ProcessEvents() try { - await eventHandlerDescriptior.OnError(service, @event, exception).ConfigureAwait(false); + await eventHandlerDescriptior.OnError(service, @event, exception, token).ConfigureAwait(false); } - catch(Exception errorHandlingException) + catch (Exception errorHandlingException) { // suppress further exeptions _logger?.LogUnhandledExceptionFromOnError(service.GetType(), errorHandlingException); diff --git a/test/M.EventBrokerSlim.Tests/EventBrokerTests.cs b/test/M.EventBrokerSlim.Tests/EventBrokerTests.cs index 9c49613..fb10da4 100644 --- a/test/M.EventBrokerSlim.Tests/EventBrokerTests.cs +++ b/test/M.EventBrokerSlim.Tests/EventBrokerTests.cs @@ -1,9 +1,12 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; +using MELT; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Xunit; namespace M.EventBrokerSlim.Tests; @@ -187,9 +190,9 @@ public async Task PublishDeferred_ExecutesHandler_After_DeferredDuration() eventsRecorder.Expect(1); var calledPublisheDeferredAt = DateTime.UtcNow; - await eventBroker.PublishDeferred(new TestEvent(CorrelationId: 1), TimeSpan.FromSeconds(1)); + await eventBroker.PublishDeferred(new TestEvent(CorrelationId: 1), TimeSpan.FromMilliseconds(200)); - var completed = await eventsRecorder.WaitForExpected(TimeSpan.FromSeconds(2)); + var completed = await eventsRecorder.WaitForExpected(TimeSpan.FromMilliseconds(250)); // Assert Assert.True(completed); @@ -197,7 +200,7 @@ public async Task PublishDeferred_ExecutesHandler_After_DeferredDuration() Assert.Equal(1, eventsRecorder.HandledEventIds[0]); var handlerExecutedAt = scope.ServiceProvider.GetRequiredService().ExecutedAt; - Assert.True(handlerExecutedAt - calledPublisheDeferredAt >= TimeSpan.FromSeconds(1)); + Assert.True(handlerExecutedAt - calledPublisheDeferredAt >= TimeSpan.FromMilliseconds(200)); } [Fact] @@ -226,7 +229,160 @@ public async Task PublishDeferred_DoesNotBlock_Publish() x => Assert.Equal(1, x)); } - public record TestEvent(int CorrelationId) : ITraceable; + [Fact] + public async Task PublishDeferred_DelayedTasks_Cancelled_OnShutdown() + { + // Arrange + var services = ServiceProviderHelper.BuildWithEventsRecorder( + sc => sc.AddEventBroker( + x => x.AddKeyedTransient()) + .AddSingleton()); + + using var scope = services.CreateScope(); + + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var eventsRecorder = scope.ServiceProvider.GetRequiredService>(); + + var expected = Enumerable.Range(1, 10).Select(x => new TestEvent(x)).ToArray(); + eventsRecorder.Expect(expected); + + // Act + foreach (var @event in expected) + { + await eventBroker.PublishDeferred(@event, TimeSpan.FromMilliseconds(200)); + } + + eventBroker.Shutdown(); + + var completed = await eventsRecorder.WaitForExpected(TimeSpan.FromMilliseconds(300)); + + // Assert + Assert.False(completed); + Assert.Empty(eventsRecorder.HandledEventIds); + } + + [Fact] + public async Task Shutdown_WhileHandlingEvent_TaskCancelledException_HandledByOnError() + { + // Arrange + var services = ServiceProviderHelper.BuildWithEventsRecorder( + sc => sc.AddEventBroker( + x => x.WithMaxConcurrentHandlers(2) + .AddKeyedTransient()) + .AddSingleton()); + + using var scope = services.CreateScope(); + + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var eventsRecorder = scope.ServiceProvider.GetRequiredService>(); + + var expected = Enumerable.Range(1, 10).Select(x => new TestEvent(x, HandlingDuration: TimeSpan.FromMilliseconds(500))).ToArray(); + eventsRecorder.Expect(expected); + + // Act + foreach (var @event in expected) + { + await eventBroker.Publish(@event); + } + + await Task.Delay(TimeSpan.FromMilliseconds(250)); + + eventBroker.Shutdown(); + + var completed = await eventsRecorder.WaitForExpected(TimeSpan.FromSeconds(1)); + + // Assert + Assert.False(completed); + Assert.Collection(eventsRecorder.HandledEventIds.Order(), + x => Assert.Equal(1, x), + x => Assert.Equal(2, x)); + + Assert.Collection(eventsRecorder.Exceptions, + x => Assert.IsType(x), + x => Assert.IsType(x)); + } + + [Fact] + public async Task Shutdown_PendingEvents_AreNot_Processed() + { + // Arrange + var services = ServiceProviderHelper.BuildWithEventsRecorder( + sc => sc.AddEventBroker( + x => x.WithMaxConcurrentHandlers(2) + .AddKeyedTransient()) + .AddSingleton()); + + using var scope = services.CreateScope(); + + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var eventsRecorder = scope.ServiceProvider.GetRequiredService>(); + + var expected = Enumerable.Range(1, 10).Select(x => new TestEvent(x, HandlingDuration: TimeSpan.FromMilliseconds(200))).ToArray(); + eventsRecorder.Expect(expected); + + // Act + foreach (var @event in expected) + { + await eventBroker.Publish(@event); + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + eventBroker.Shutdown(); + + var completed = await eventsRecorder.WaitForExpected(TimeSpan.FromMilliseconds(300)); + + // Assert + Assert.False(completed); + + Assert.Equal(8, eventsRecorder.Expected.Length); + + Assert.DoesNotContain(1, eventsRecorder.Expected); + Assert.DoesNotContain(2, eventsRecorder.Expected); + } + + [Fact] + public async Task Shutdown_WhileHandlingError_TaskCancelledException_IsLogged() + { + // Arrange + var services = ServiceProviderHelper.BuildWithEventsRecorderAndLogger( + sc => sc.AddEventBroker( + x => x.WithMaxConcurrentHandlers(2) + .AddKeyedTransient()) + .AddSingleton()); + + using var scope = services.CreateScope(); + + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var eventsRecorder = scope.ServiceProvider.GetRequiredService>(); + + var testEvent = new TestEvent(CorrelationId: 1, ThrowFromHandle: true, ErrorHandlingDuration: TimeSpan.FromMilliseconds(300)); + eventsRecorder.Expect(testEvent); + + // Act + await eventBroker.Publish(testEvent); + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + eventBroker.Shutdown(); + + await eventsRecorder.Wait(TimeSpan.FromMilliseconds(100)); + + // Assert + + var provider = (TestLoggerProvider)scope.ServiceProvider.GetServices().Single(x => x is TestLoggerProvider); + + var log = Assert.Single(provider.Sink.LogEntries); + Assert.Equal(LogLevel.Error, log.LogLevel); + Assert.Equal("Unhandled exception executing M.EventBrokerSlim.Tests.EventBrokerTests+TestEventHandler.OnError()", log.Message); + Assert.IsType(log.Exception); + } + + public record TestEvent( + int CorrelationId, + TimeSpan HandlingDuration = default, + bool ThrowFromHandle = false, + TimeSpan ErrorHandlingDuration = default) : ITraceable; public class TestEventHandler : IEventHandler { @@ -239,21 +395,34 @@ public TestEventHandler(EventsRecorder eventsRecorder, Timestamp timestamp _timestamp = timestamp; } - public Task Handle(TestEvent @event) + public async Task Handle(TestEvent @event, CancellationToken cancellationToken) { + _eventsRecoder.Notify(@event); + if (_timestamp is not null) { _timestamp.ExecutedAt = DateTime.UtcNow; } - _eventsRecoder.Notify(@event); - return Task.CompletedTask; + if (@event.ThrowFromHandle) + { + throw new InvalidOperationException("Exception during event handling"); + } + + if (@event.HandlingDuration != default) + { + await Task.Delay(@event.HandlingDuration, cancellationToken); + } } - public Task OnError(Exception exception, TestEvent @event) + public async Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecoder.Notify(exception, @event); - return Task.CompletedTask; + + if (@event.ErrorHandlingDuration != default) + { + await Task.Delay(@event.ErrorHandlingDuration, cancellationToken); + } } } diff --git a/test/M.EventBrokerSlim.Tests/EventRecorder.cs b/test/M.EventBrokerSlim.Tests/EventRecorder.cs index 27fc7ad..f66cb84 100644 --- a/test/M.EventBrokerSlim.Tests/EventRecorder.cs +++ b/test/M.EventBrokerSlim.Tests/EventRecorder.cs @@ -18,6 +18,8 @@ public class EventsRecorder public T[] HandledEventIds => _events.OrderBy(x => x.tick).Select(x => x.id).ToArray(); + public T[] Expected => _expected.Keys.ToArray(); + public int[] HandlerObjectsHashCodes => _handlerInstances.OrderBy(x => x.tick).Select(x => x.id).ToArray(); public int[] HandlerScopeHashCodes => _scopeInstances.OrderBy(x => x.tick).Select(x => x.id).ToArray(); diff --git a/test/M.EventBrokerSlim.Tests/ExceptionHandlingTests.cs b/test/M.EventBrokerSlim.Tests/ExceptionHandlingTests.cs index 7fef9d6..0610ccf 100644 --- a/test/M.EventBrokerSlim.Tests/ExceptionHandlingTests.cs +++ b/test/M.EventBrokerSlim.Tests/ExceptionHandlingTests.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; using MELT; @@ -154,7 +155,7 @@ public TestEventHandler(EventsRecorder eventBroker) _eventsRecorder = eventBroker; } - public Task Handle(TestEvent @event) + public Task Handle(TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(@event); if (@event.ThrowFromHandle) @@ -165,7 +166,7 @@ public Task Handle(TestEvent @event) return Task.CompletedTask; } - public Task OnError(Exception exception, TestEvent @event) + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(exception, @event); if (@event.ThrowFromOnError) @@ -185,8 +186,8 @@ public TestEventHandler1(string input) _input = input; } - public Task Handle(TestEvent @event) => throw new NotImplementedException(); + public Task Handle(TestEvent @event, CancellationToken cancellationToken) => throw new NotImplementedException(); - public Task OnError(Exception exception, TestEvent @event) => throw new NotImplementedException(); + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) => throw new NotImplementedException(); } } diff --git a/test/M.EventBrokerSlim.Tests/HandlerExecutionTests.cs b/test/M.EventBrokerSlim.Tests/HandlerExecutionTests.cs index b9ce076..6b08ed9 100644 --- a/test/M.EventBrokerSlim.Tests/HandlerExecutionTests.cs +++ b/test/M.EventBrokerSlim.Tests/HandlerExecutionTests.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; using MELT; @@ -162,7 +163,7 @@ public TestEventHandler(EventsRecorder eventsRecorder) _eventsRecoder = eventsRecorder; } - public async Task Handle(TestEvent @event) + public async Task Handle(TestEvent @event, CancellationToken cancellationToken) { if (@event.TimeToRun != default) { @@ -172,7 +173,7 @@ public async Task Handle(TestEvent @event) _eventsRecoder.Notify(@event); } - public Task OnError(Exception exception, TestEvent @event) + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecoder.Notify(exception, @event); return Task.CompletedTask; diff --git a/test/M.EventBrokerSlim.Tests/HandlerRegistrationTests.cs b/test/M.EventBrokerSlim.Tests/HandlerRegistrationTests.cs index 0723e21..8243ea0 100644 --- a/test/M.EventBrokerSlim.Tests/HandlerRegistrationTests.cs +++ b/test/M.EventBrokerSlim.Tests/HandlerRegistrationTests.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; using Microsoft.Extensions.DependencyInjection; @@ -199,21 +200,19 @@ public record TestEvent(string CorrelationId) : ITraceable; public class TestEventHandler : IEventHandler { private readonly EventsRecorder _eventsRecorder; - private readonly IServiceProvider _scope; - public TestEventHandler(EventsRecorder eventsRecorder, IServiceProvider scope) + public TestEventHandler(EventsRecorder eventsRecorder) { _eventsRecorder = eventsRecorder; - _scope = scope; } - public Task Handle(TestEvent @event) + public Task Handle(TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify($"{@event.CorrelationId}_{GetType().Name}"); return Task.CompletedTask; } - public Task OnError(Exception exception, TestEvent @event) + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(exception, @event); return Task.CompletedTask; @@ -222,14 +221,14 @@ public Task OnError(Exception exception, TestEvent @event) public class TestEventHandler1 : TestEventHandler { - public TestEventHandler1(EventsRecorder eventsRecorder, IServiceProvider scope) : base(eventsRecorder, scope) + public TestEventHandler1(EventsRecorder eventsRecorder) : base(eventsRecorder) { } } public class TestEventHandler2 : TestEventHandler { - public TestEventHandler2(EventsRecorder eventsRecorder, IServiceProvider scope) : base(eventsRecorder, scope) + public TestEventHandler2(EventsRecorder eventsRecorder) : base(eventsRecorder) { } } diff --git a/test/M.EventBrokerSlim.Tests/HandlerScopeAndInstanceTests.cs b/test/M.EventBrokerSlim.Tests/HandlerScopeAndInstanceTests.cs index 745a94f..1f1706b 100644 --- a/test/M.EventBrokerSlim.Tests/HandlerScopeAndInstanceTests.cs +++ b/test/M.EventBrokerSlim.Tests/HandlerScopeAndInstanceTests.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; using Microsoft.Extensions.DependencyInjection; @@ -123,14 +124,14 @@ public TestEventHandler(EventsRecorder eventsRecorder, IServiceProvider sco _scope = scope; } - public Task Handle(TestEvent @event) + public Task Handle(TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(@event); _eventsRecorder.Notify(@event, handlerInstance: GetHashCode(), scopeInstance: _scope.GetHashCode()); return Task.CompletedTask; } - public Task OnError(Exception exception, TestEvent @event) + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(exception, @event); return Task.CompletedTask; diff --git a/test/M.EventBrokerSlim.Tests/MultipleHandlersTests.cs b/test/M.EventBrokerSlim.Tests/MultipleHandlersTests.cs index b2cfff3..3678d69 100644 --- a/test/M.EventBrokerSlim.Tests/MultipleHandlersTests.cs +++ b/test/M.EventBrokerSlim.Tests/MultipleHandlersTests.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using M.EventBrokerSlim.DependencyInjection; using Microsoft.Extensions.DependencyInjection; @@ -54,13 +55,13 @@ public TestEventHandler(EventsRecorder eventsRecorder) _eventsRecorder = eventsRecorder; } - public Task Handle(TestEvent @event) + public Task Handle(TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify($"{@event.CorrelationId}_{GetType().Name}"); return Task.CompletedTask; } - public Task OnError(Exception exception, TestEvent @event) + public Task OnError(Exception exception, TestEvent @event, CancellationToken cancellationToken) { _eventsRecorder.Notify(exception, @event); return Task.CompletedTask;