diff --git a/README.md b/README.md index efa8a51..6abe6f8 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,8 @@ Features: - built-in retry option - tightly integrated with Microsoft.Extensions.DependencyInjection - each handler is resolved and executed in a new DI container scope -- **NEW** event handlers can be delegates +- **NEW** event handlers can be delegates +- **NEW** dynamic delegate event handlers # How does it work @@ -211,7 +212,57 @@ builder.RegisterHandler( }); ``` -Delegate wrappers are executed from the last registered moving "inwards" toward the handler. +Delegate wrappers are executed from the last registered moving "inwards" toward the handler. + +### Dynamic Delegate Event Handlers + +Delegate handlers can be added or removed after DI container was built. Dynamic delegate handlers are created using `DelegateHandlerRegistryBuilder` and support all delegate handler features (retries, wrappers, etc.). + +EventBroker registration adds `IDynamicEventHandlers` which is used for managing handlers. Adding handlers returns `IDynamicHandlerClaimTicket` used to remove the handlers. Since `DelegateHandlerRegistryBuilder` can define multiple handlers, all of them will be removed by the `IDynamicHandlerClaimTicket` instance. + +```csharp +public class DynamicEventHandlerExample : IDisposable +{ + private readonly IDynamicEventHandlers _dynamicEventHandlers; + private readonly IDynamicHandlerClaimTicket _claimTicket; + + public DynamicEventHandlerExample(IDynamicEventHandlers dynamicEventHandlers) + { + _dynamicEventHandlers = dynamicEventHandlers; + + DelegateHandlerRegistryBuilder handlerRegistryBuilder = new(); + + // Define two handlers for different events + handlerRegistryBuilder + .RegisterHandler(HandleEvent1) + .Builder() + .RegisterHandler(HandleEvent2); + + // Register with the event broker and keep a claim ticket + _claimTicket = _dynamicEventHandlers.Add(handlerRegistryBuilder); + } + + // All delegate features are available, including injecting services registered in DI + private async Task HandleEvent1(Event1 event1, IRetryPolicy retryPolicy, ISomeService someService) + { + // event processing + } + + private async Task HandleEvent2(Event2 event2) + { + // event processing + } + + public void Dispose() + { + // Remove both event handlers using the IDynamicHandlerClaimTicket + _dynamicEventHandlers.Remove(_claimTicket); + } +} + +``` +> [!IMPORTANT] +> Make sure handlers are removed if containing classes are ephemeral. ## DI Configuration diff --git a/package-readme.md b/package-readme.md index bfefe57..0d38349 100644 --- a/package-readme.md +++ b/package-readme.md @@ -11,7 +11,8 @@ Features: - built-in retry option - tightly integrated with Microsoft.Extensions.DependencyInjection - each handler is resolved and executed in a new DI container scope -- **NEW** event handlers can be delegates +- **NEW** event handlers can be delegates +- **NEW** dynamic delegate event handlers # How does it work diff --git a/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs b/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs index 2384f4b..2e11274 100644 --- a/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/M.EventBrokerSlim/DependencyInjection/ServiceCollectionExtensions.cs @@ -59,7 +59,8 @@ public static IServiceCollection AddEventBroker( x.GetRequiredService(), x.GetRequiredService(), x.GetRequiredKeyedService(eventBrokerKey), - x.GetService>())); + x.GetService>(), + x.GetRequiredService())); serviceCollection.AddSingleton( x => @@ -75,6 +76,10 @@ public static IServiceCollection AddEventBroker( return DelegateHandlerRegistryBuilder.Build(builders); }); + DynamicEventHandlers dynamicEventHandlers = new(); + serviceCollection.AddSingleton(dynamicEventHandlers); + serviceCollection.AddSingleton(dynamicEventHandlers); + return serviceCollection; } diff --git a/src/M.EventBrokerSlim/IDynamicEventHandlers.cs b/src/M.EventBrokerSlim/IDynamicEventHandlers.cs new file mode 100644 index 0000000..0c7907a --- /dev/null +++ b/src/M.EventBrokerSlim/IDynamicEventHandlers.cs @@ -0,0 +1,29 @@ +using System.Collections.Generic; +using M.EventBrokerSlim.DependencyInjection; + +namespace M.EventBrokerSlim; + +/// +/// Allows managing of delegate event handlers at runtime. +/// +public interface IDynamicEventHandlers +{ + /// + /// Adds one or more delegate handlers. + /// + /// An instance of describing the handlers. + /// identifying added handlers. + IDynamicHandlerClaimTicket Add(DelegateHandlerRegistryBuilder builder); + + /// + /// Removes one or more delegate handlers by + /// + /// identifying handlers to remove. + void Remove(IDynamicHandlerClaimTicket claimTicket); + + /// + /// Removes one or more delegate handlers by + /// + /// Multiple identifying handlers to remove. + void RemoveRange(IEnumerable claimTickets); +} diff --git a/src/M.EventBrokerSlim/IDynamicHandlerClaimTicket.cs b/src/M.EventBrokerSlim/IDynamicHandlerClaimTicket.cs new file mode 100644 index 0000000..2cfb1f9 --- /dev/null +++ b/src/M.EventBrokerSlim/IDynamicHandlerClaimTicket.cs @@ -0,0 +1,8 @@ +namespace M.EventBrokerSlim; + +/// +/// Identifies one or more delegate event handlers added by +/// +public interface IDynamicHandlerClaimTicket +{ +} diff --git a/src/M.EventBrokerSlim/Internal/DelegateHandlerDescriptor.cs b/src/M.EventBrokerSlim/Internal/DelegateHandlerDescriptor.cs index 62bc59a..7cecb33 100644 --- a/src/M.EventBrokerSlim/Internal/DelegateHandlerDescriptor.cs +++ b/src/M.EventBrokerSlim/Internal/DelegateHandlerDescriptor.cs @@ -12,4 +12,6 @@ internal sealed class DelegateHandlerDescriptor public required object Handler { get; init; } public List Pipeline { get; } = new(); + + internal DynamicHandlerClaimTicket? ClaimTicket { get; set; } } diff --git a/src/M.EventBrokerSlim/Internal/DynamicEventHandlers.cs b/src/M.EventBrokerSlim/Internal/DynamicEventHandlers.cs new file mode 100644 index 0000000..6b3f72e --- /dev/null +++ b/src/M.EventBrokerSlim/Internal/DynamicEventHandlers.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading; +using M.EventBrokerSlim.DependencyInjection; + +namespace M.EventBrokerSlim.Internal; + +internal sealed class DynamicEventHandlers : IDynamicEventHandlers +{ + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private readonly Dictionary> _handlers = new Dictionary>(); + + public IDynamicHandlerClaimTicket Add(DelegateHandlerRegistryBuilder builder) + { + try + { + _semaphore.Wait(); + var claimTicket = new DynamicHandlerClaimTicket(Guid.NewGuid()); + foreach(DelegateHandlerDescriptor handler in builder.HandlerDescriptors) + { + if(!_handlers.TryGetValue(handler.EventType, out ImmutableList? value)) + { + value = ImmutableList.Empty; + _handlers[handler.EventType] = value; + } + + handler.ClaimTicket = claimTicket; + _handlers[handler.EventType] = value.Add(handler); + } + + return claimTicket; + } + finally + { + _semaphore.Release(); + } + } + + public void Remove(IDynamicHandlerClaimTicket claimTicket) + { + try + { + _semaphore.Wait(); + foreach(var key in _handlers.Keys) + { + _handlers[key] = _handlers[key].RemoveAll(x => + { + if(x.ClaimTicket is null) + { + return true; + } + + if(x.ClaimTicket.Equals(claimTicket)) + { + x.ClaimTicket = null; + return true; + } + + return false; + }); + } + } + finally + { + _semaphore.Release(); + } + } + + public void RemoveRange(IEnumerable claimTickets) + { + try + { + _semaphore.Wait(); + var claimTicketSet = new HashSet(claimTickets); + foreach(var key in _handlers.Keys) + { + _handlers[key] = _handlers[key].RemoveAll(x => + { + if(x.ClaimTicket is null) + { + return true; + } + + if(claimTicketSet.Contains(x.ClaimTicket)) + { + x.ClaimTicket = null; + return true; + } + + return false; + }); + } + } + finally + { + _semaphore.Release(); + } + } + + internal ImmutableList? GetDelegateHandlerDescriptors(Type eventType) + { + _ = _handlers.TryGetValue(eventType, out ImmutableList? handlerDescriptors); + return handlerDescriptors; + } +} diff --git a/src/M.EventBrokerSlim/Internal/DynamicHandlerClaimTicket.cs b/src/M.EventBrokerSlim/Internal/DynamicHandlerClaimTicket.cs new file mode 100644 index 0000000..007d0c2 --- /dev/null +++ b/src/M.EventBrokerSlim/Internal/DynamicHandlerClaimTicket.cs @@ -0,0 +1,5 @@ +using System; + +namespace M.EventBrokerSlim.Internal; + +internal sealed record DynamicHandlerClaimTicket(Guid Id) : IDynamicHandlerClaimTicket; diff --git a/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs b/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs index e61908e..0368c02 100644 --- a/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs +++ b/src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Immutable; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -16,6 +17,7 @@ internal sealed class ThreadPoolEventHandlerRunner private readonly DelegateHandlerRegistry _delegateHandlerRegistry; private readonly CancellationTokenSource _cancellationTokenSource; private readonly ILogger _logger; + private readonly DynamicEventHandlers _dynamicEventHandlers; private readonly SemaphoreSlim _semaphore; private readonly DefaultObjectPool _contextObjectPool; @@ -25,13 +27,15 @@ internal ThreadPoolEventHandlerRunner( EventHandlerRegistry eventHandlerRegistry, DelegateHandlerRegistry delegateHandlerRegistry, CancellationTokenSource cancellationTokenSource, - ILogger? logger) + ILogger? logger, + DynamicEventHandlers dynamicEventHandlers) { _channelReader = channel.Reader; _eventHandlerRegistry = eventHandlerRegistry; _delegateHandlerRegistry = delegateHandlerRegistry; _cancellationTokenSource = cancellationTokenSource; _logger = logger ?? new NullLogger(); + _dynamicEventHandlers = dynamicEventHandlers; _semaphore = new SemaphoreSlim(_eventHandlerRegistry.MaxConcurrentHandlers, _eventHandlerRegistry.MaxConcurrentHandlers); var retryQueue = new RetryQueue(channel.Writer, cancellationTokenSource.Token); @@ -58,10 +62,14 @@ private async ValueTask ProcessEvents() RetryDescriptor? retryDescriptor = @event as RetryDescriptor; if(retryDescriptor is null) { - var type = @event.GetType(); - var eventHandlers = _eventHandlerRegistry.GetEventHandlers(type); - var delegateEventHandlers = _delegateHandlerRegistry.GetHandlers(type); - if(eventHandlers.Length == 0 && delegateEventHandlers.Length == 0) + Type type = @event.GetType(); + ImmutableArray eventHandlers = _eventHandlerRegistry.GetEventHandlers(type); + ImmutableArray delegateEventHandlers = _delegateHandlerRegistry.GetHandlers(type); + ImmutableList? dynamicEventHandlers = _dynamicEventHandlers.GetDelegateHandlerDescriptors(type); + + if(eventHandlers.Length == 0 && + delegateEventHandlers.Length == 0 && + (dynamicEventHandlers is null || dynamicEventHandlers.IsEmpty)) { if(!_eventHandlerRegistry.DisableMissingHandlerWarningLog) { @@ -75,9 +83,9 @@ private async ValueTask ProcessEvents() { await _semaphore.WaitAsync(token).ConfigureAwait(false); - var eventHandlerDescriptor = eventHandlers[i]; + EventHandlerDescriptor eventHandlerDescriptor = eventHandlers[i]; - var context = _contextObjectPool.Get().Initialize(@event, eventHandlerDescriptor, null, retryDescriptor, token); + HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, eventHandlerDescriptor, null, retryDescriptor, token); _ = Task.Factory.StartNew(static async x => await HandleEvent(x!), context); } @@ -85,9 +93,28 @@ private async ValueTask ProcessEvents() { await _semaphore.WaitAsync(token).ConfigureAwait(false); - var delegateHandlerDescriptor = delegateEventHandlers[i]; + DelegateHandlerDescriptor delegateHandlerDescriptor = delegateEventHandlers[i]; - var context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token); + HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token); + _ = Task.Factory.StartNew(static async x => await HandleEventWithDelegate(x!), context); + } + + if(dynamicEventHandlers is null || dynamicEventHandlers.IsEmpty) + { + continue; + } + + for(int i = 0; i < dynamicEventHandlers.Count; i++) + { + await _semaphore.WaitAsync(token).ConfigureAwait(false); + + DelegateHandlerDescriptor delegateHandlerDescriptor = dynamicEventHandlers[i]; + if(delegateHandlerDescriptor.ClaimTicket is null) + { + continue; + } + + HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token); _ = Task.Factory.StartNew(static async x => await HandleEventWithDelegate(x!), context); } } diff --git a/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/HandlerExecutionTests.cs b/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/HandlerExecutionTests.cs index dc581ca..450cf02 100644 --- a/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/HandlerExecutionTests.cs +++ b/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/HandlerExecutionTests.cs @@ -2,7 +2,7 @@ namespace M.EventBrokerSlim.Tests.DelegateHandlerTests; -public class HandlerExecutionTests +public class HandlerExecutionTests : IDisposable { private readonly ITestOutputHelper _output; private readonly ServiceProvider _serviceProvider; @@ -315,4 +315,9 @@ public async Task Retry_From_Wrapper() _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); } + + public void Dispose() + { + _serviceProvider.Dispose(); + } } diff --git a/test/M.EventBrokerSlim.Tests/DynamicDelegateHandlerTests/DynamicHandlerExecutionTests.cs b/test/M.EventBrokerSlim.Tests/DynamicDelegateHandlerTests/DynamicHandlerExecutionTests.cs new file mode 100644 index 0000000..bc756d6 --- /dev/null +++ b/test/M.EventBrokerSlim.Tests/DynamicDelegateHandlerTests/DynamicHandlerExecutionTests.cs @@ -0,0 +1,376 @@ +using Xunit.Abstractions; + +namespace M.EventBrokerSlim.Tests.DynamicDelegateHandlerTests; + +public class DynamicHandlerExecutionTests : IDisposable +{ + private readonly ITestOutputHelper _output; + private readonly ServiceProvider _serviceProvider; + private readonly EventsTracker _tracker; + + public DynamicHandlerExecutionTests(ITestOutputHelper output) + { + _output = output; + _tracker = new EventsTracker(); + _serviceProvider = ServiceProviderHelper.Build( + x => x.AddEventBroker() + .AddSingleton(_tracker)); + } + + public void Dispose() + { + _serviceProvider.Dispose(); + } + + [Fact] + public async Task Handler_Dynamically_Added() + { + // Arrange + var handlerRegistryBuilder = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 1; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + _ = dynamicEventHandlers.Add(handlerRegistryBuilder); + await eventBroker.Publish(new TestEventBase(2)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Single(items); + Assert.Equal(2, items[0].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Handler_Dynamically_Removed() + { + // Arrange + var handlerRegistryBuilder = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using IServiceScope scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 1; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket = dynamicEventHandlers.Add(handlerRegistryBuilder); + await eventBroker.Publish(new TestEventBase(2)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + dynamicEventHandlers.Remove(claimTicket); + + await eventBroker.Publish(new TestEventBase(3)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Single(items); + Assert.Equal(2, items[0].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Multiple_Dynamic_Handlers_Added() + { + // Arrange + var handlerRegistryBuilder = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 2; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket = dynamicEventHandlers.Add(handlerRegistryBuilder); + await eventBroker.Publish(new TestEventBase(2)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Equal(2, items.Length); + Assert.Equal(2, items[0].Number); + Assert.Equal(2, items[1].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Multiple_Dynamic_Handlers_Removed() + { + // Arrange + var handlerRegistryBuilder = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 2; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket = dynamicEventHandlers.Add(handlerRegistryBuilder); + await eventBroker.Publish(new TestEventBase(2)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + dynamicEventHandlers.Remove(claimTicket); + + await eventBroker.Publish(new TestEventBase(3)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Equal(2, items.Length); + Assert.Equal(2, items[0].Number); + Assert.Equal(2, items[1].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Multiple_Dynamic_HandlerRegistries_Added() + { + // Arrange + var handlerRegistryBuilder1 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder1 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + var handlerRegistryBuilder2 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder2 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 3; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket1 = dynamicEventHandlers.Add(handlerRegistryBuilder1); + IDynamicHandlerClaimTicket claimTicket2 = dynamicEventHandlers.Add(handlerRegistryBuilder2); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + await eventBroker.Publish(new TestEventBase(2)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Equal(3, items.Length); + Assert.Equal(2, items[0].Number); + Assert.Equal(2, items[1].Number); + Assert.Equal(2, items[2].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Multiple_Dynamic_HandlerRegistries_Removed() + { + // Arrange + var handlerRegistryBuilder1 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder1 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + var handlerRegistryBuilder2 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder2 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 3; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket1 = dynamicEventHandlers.Add(handlerRegistryBuilder1); + IDynamicHandlerClaimTicket claimTicket2 = dynamicEventHandlers.Add(handlerRegistryBuilder2); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + await eventBroker.Publish(new TestEventBase(2)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + dynamicEventHandlers.RemoveRange([claimTicket1, claimTicket2]); + + await eventBroker.Publish(new TestEventBase(3)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Equal(3, items.Length); + Assert.Equal(2, items[0].Number); + Assert.Equal(2, items[1].Number); + Assert.Equal(2, items[2].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Multiple_Dynamic_HandlerRegistries_RemoveSome() + { + // Arrange + var handlerRegistryBuilder1 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder1 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + var handlerRegistryBuilder2 = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder2 + .RegisterHandler(async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 4; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + IDynamicHandlerClaimTicket claimTicket1 = dynamicEventHandlers.Add(handlerRegistryBuilder1); + IDynamicHandlerClaimTicket claimTicket2 = dynamicEventHandlers.Add(handlerRegistryBuilder2); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + await eventBroker.Publish(new TestEventBase(2)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + dynamicEventHandlers.Remove(claimTicket1); + + await eventBroker.Publish(new TestEventBase(3)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.OrderBy(x => x.Timestamp).Select(x => x.Item).OfType().ToArray(); + Assert.Equal(4, items.Length); + Assert.Equal(2, items[0].Number); + Assert.Equal(2, items[1].Number); + Assert.Equal(2, items[2].Number); + Assert.Equal(3, items[3].Number); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } + + [Fact] + public async Task Handler_Dynamically_Added_Supports_Wrappers() + { + // Arrange + var handlerRegistryBuilder = new DelegateHandlerRegistryBuilder(); + handlerRegistryBuilder + .RegisterHandler( + async static (TestEventBase testEvent, EventsTracker tracker) => + { + await tracker.TrackAsync(testEvent); + }) + .WrapWith( + async static (TestEventBase testEvent, EventsTracker tracker, INextHandler next) => + { + await tracker.TrackAsync(2); + await next.Execute(); + }) + .WrapWith( + async static (TestEventBase testEvent, EventsTracker tracker, INextHandler next) => + { + await tracker.TrackAsync(3); + await next.Execute(); + }); + + using var scope = _serviceProvider.CreateScope(); + var eventBroker = scope.ServiceProvider.GetRequiredService(); + var dynamicEventHandlers = scope.ServiceProvider.GetRequiredService(); + _tracker.ExpectedItemsCount = 3; + + // Act + await eventBroker.Publish(new TestEventBase(1)); + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + _ = dynamicEventHandlers.Add(handlerRegistryBuilder); + await eventBroker.Publish(new TestEventBase(2)); + + await _tracker.Wait(TimeSpan.FromMilliseconds(300)); + + // Assert + var items = _tracker.Items.Select(x => x.Item).OfType().ToArray(); + Assert.Single(items); + Assert.Equal(2, items[0].Number); + + var wrapperItems = _tracker.Items.OrderBy(x => x.Timestamp).Select(x => x.Item).OfType().ToArray(); + Assert.Equal(2, wrapperItems.Length); + Assert.Equal(3, wrapperItems[0]); + Assert.Equal(2, wrapperItems[1]); + + _output.WriteLine($"Elapsed: {_tracker.Elapsed}"); + } +} + +public class DynamicEventHandlerExample : IDisposable +{ + private readonly IDynamicEventHandlers _dynamicEventHandlers; + private readonly IDynamicHandlerClaimTicket _claimTicket; + + public DynamicEventHandlerExample(IDynamicEventHandlers dynamicEventHandlers) + { + _dynamicEventHandlers = dynamicEventHandlers; + + DelegateHandlerRegistryBuilder handlerRegistryBuilder = new(); + handlerRegistryBuilder + .RegisterHandler( + async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)) + .Builder() + .RegisterHandler( + async static (TestEventBase testEvent, EventsTracker tracker) => await tracker.TrackAsync(testEvent)); + + _claimTicket = _dynamicEventHandlers.Add(handlerRegistryBuilder); + } + + private Task HandleEvent1(Event1 event1) => Task.CompletedTask; + + private Task HandleEvent2(Event2 event2) => Task.CompletedTask; + + public void Dispose() + { + // Remove both event handlers using the IDynamicHandlerClaimTicket + _dynamicEventHandlers.Remove(_claimTicket); + } +} diff --git a/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/Events.cs b/test/M.EventBrokerSlim.Tests/Events.cs similarity index 78% rename from test/M.EventBrokerSlim.Tests/DelegateHandlerTests/Events.cs rename to test/M.EventBrokerSlim.Tests/Events.cs index 6a6de4a..953f64d 100644 --- a/test/M.EventBrokerSlim.Tests/DelegateHandlerTests/Events.cs +++ b/test/M.EventBrokerSlim.Tests/Events.cs @@ -1,4 +1,4 @@ -namespace M.EventBrokerSlim.Tests.DelegateHandlerTests; +namespace M.EventBrokerSlim.Tests; public record TestEventBase(int Number);