Skip to content

Commit

Permalink
Dynamic delegate event handlers (#10)
Browse files Browse the repository at this point in the history
* Add dynamic delegate event handlers

* Update package-readme.md
  • Loading branch information
petar-m authored Sep 14, 2024
1 parent 2d172eb commit d11abb7
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 15 deletions.
55 changes: 53 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ Features:
- built-in retry option
- tightly integrated with Microsoft.Extensions.DependencyInjection
- each handler is resolved and executed in a new DI container scope
- **NEW** event handlers can be delegates
- **NEW** event handlers can be delegates
- **NEW** dynamic delegate event handlers

# How does it work

Expand Down Expand Up @@ -211,7 +212,57 @@ builder.RegisterHandler<SomeEvent>(
});
```

Delegate wrappers are executed from the last registered moving "inwards" toward the handler.
Delegate wrappers are executed from the last registered moving "inwards" toward the handler.

### Dynamic Delegate Event Handlers

Delegate handlers can be added or removed after DI container was built. Dynamic delegate handlers are created using `DelegateHandlerRegistryBuilder` and support all delegate handler features (retries, wrappers, etc.).

EventBroker registration adds `IDynamicEventHandlers` which is used for managing handlers. Adding handlers returns `IDynamicHandlerClaimTicket` used to remove the handlers. Since `DelegateHandlerRegistryBuilder` can define multiple handlers, all of them will be removed by the `IDynamicHandlerClaimTicket` instance.

```csharp
public class DynamicEventHandlerExample : IDisposable
{
private readonly IDynamicEventHandlers _dynamicEventHandlers;
private readonly IDynamicHandlerClaimTicket _claimTicket;

public DynamicEventHandlerExample(IDynamicEventHandlers dynamicEventHandlers)
{
_dynamicEventHandlers = dynamicEventHandlers;

DelegateHandlerRegistryBuilder handlerRegistryBuilder = new();

// Define two handlers for different events
handlerRegistryBuilder
.RegisterHandler<Event1>(HandleEvent1)
.Builder()
.RegisterHandler<Event2>(HandleEvent2);

// Register with the event broker and keep a claim ticket
_claimTicket = _dynamicEventHandlers.Add(handlerRegistryBuilder);
}

// All delegate features are available, including injecting services registered in DI
private async Task HandleEvent1(Event1 event1, IRetryPolicy retryPolicy, ISomeService someService)
{
// event processing
}

private async Task HandleEvent2(Event2 event2)
{
// event processing
}

public void Dispose()
{
// Remove both event handlers using the IDynamicHandlerClaimTicket
_dynamicEventHandlers.Remove(_claimTicket);
}
}

```
> [!IMPORTANT]
> Make sure handlers are removed if containing classes are ephemeral.
## DI Configuration

Expand Down
3 changes: 2 additions & 1 deletion package-readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Features:
- built-in retry option
- tightly integrated with Microsoft.Extensions.DependencyInjection
- each handler is resolved and executed in a new DI container scope
- **NEW** event handlers can be delegates
- **NEW** event handlers can be delegates
- **NEW** dynamic delegate event handlers

# How does it work

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static IServiceCollection AddEventBroker(
x.GetRequiredService<EventHandlerRegistry>(),
x.GetRequiredService<DelegateHandlerRegistry>(),
x.GetRequiredKeyedService<CancellationTokenSource>(eventBrokerKey),
x.GetService<ILogger<ThreadPoolEventHandlerRunner>>()));
x.GetService<ILogger<ThreadPoolEventHandlerRunner>>(),
x.GetRequiredService<DynamicEventHandlers>()));

serviceCollection.AddSingleton(
x =>
Expand All @@ -75,6 +76,10 @@ public static IServiceCollection AddEventBroker(
return DelegateHandlerRegistryBuilder.Build(builders);
});

DynamicEventHandlers dynamicEventHandlers = new();
serviceCollection.AddSingleton<DynamicEventHandlers>(dynamicEventHandlers);
serviceCollection.AddSingleton<IDynamicEventHandlers>(dynamicEventHandlers);

return serviceCollection;
}

Expand Down
29 changes: 29 additions & 0 deletions src/M.EventBrokerSlim/IDynamicEventHandlers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Collections.Generic;
using M.EventBrokerSlim.DependencyInjection;

namespace M.EventBrokerSlim;

/// <summary>
/// Allows managing of delegate event handlers at runtime.
/// </summary>
public interface IDynamicEventHandlers
{
/// <summary>
/// Adds one or more delegate handlers.
/// </summary>
/// <param name="builder">An instance of <see cref="DelegateHandlerRegistryBuilder"/> describing the handlers.</param>
/// <returns><see cref="IDynamicHandlerClaimTicket"/> identifying added handlers.</returns>
IDynamicHandlerClaimTicket Add(DelegateHandlerRegistryBuilder builder);

/// <summary>
/// Removes one or more delegate handlers by <see cref="IDynamicHandlerClaimTicket"/>
/// </summary>
/// <param name="claimTicket"><see cref="IDynamicHandlerClaimTicket"/> identifying handlers to remove.</param>
void Remove(IDynamicHandlerClaimTicket claimTicket);

/// <summary>
/// Removes one or more delegate handlers by <see cref="IDynamicHandlerClaimTicket"/>
/// </summary>
/// <param name="claimTickets">Multiple <see cref="IDynamicHandlerClaimTicket"/> identifying handlers to remove.</param>
void RemoveRange(IEnumerable<IDynamicHandlerClaimTicket> claimTickets);
}
8 changes: 8 additions & 0 deletions src/M.EventBrokerSlim/IDynamicHandlerClaimTicket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace M.EventBrokerSlim;

/// <summary>
/// Identifies one or more delegate event handlers added by <see cref="IDynamicEventHandlers"/>
/// </summary>
public interface IDynamicHandlerClaimTicket
{
}
2 changes: 2 additions & 0 deletions src/M.EventBrokerSlim/Internal/DelegateHandlerDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ internal sealed class DelegateHandlerDescriptor
public required object Handler { get; init; }

public List<DelegateHandlerDescriptor> Pipeline { get; } = new();

internal DynamicHandlerClaimTicket? ClaimTicket { get; set; }
}
106 changes: 106 additions & 0 deletions src/M.EventBrokerSlim/Internal/DynamicEventHandlers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using M.EventBrokerSlim.DependencyInjection;

namespace M.EventBrokerSlim.Internal;

internal sealed class DynamicEventHandlers : IDynamicEventHandlers
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Dictionary<Type, ImmutableList<DelegateHandlerDescriptor>> _handlers = new Dictionary<Type, ImmutableList<DelegateHandlerDescriptor>>();

public IDynamicHandlerClaimTicket Add(DelegateHandlerRegistryBuilder builder)
{
try
{
_semaphore.Wait();
var claimTicket = new DynamicHandlerClaimTicket(Guid.NewGuid());
foreach(DelegateHandlerDescriptor handler in builder.HandlerDescriptors)
{
if(!_handlers.TryGetValue(handler.EventType, out ImmutableList<DelegateHandlerDescriptor>? value))
{
value = ImmutableList<DelegateHandlerDescriptor>.Empty;
_handlers[handler.EventType] = value;
}

handler.ClaimTicket = claimTicket;
_handlers[handler.EventType] = value.Add(handler);
}

return claimTicket;
}
finally
{
_semaphore.Release();
}
}

public void Remove(IDynamicHandlerClaimTicket claimTicket)
{
try
{
_semaphore.Wait();
foreach(var key in _handlers.Keys)
{
_handlers[key] = _handlers[key].RemoveAll(x =>
{
if(x.ClaimTicket is null)
{
return true;
}

if(x.ClaimTicket.Equals(claimTicket))
{
x.ClaimTicket = null;
return true;
}

return false;
});
}
}
finally
{
_semaphore.Release();
}
}

public void RemoveRange(IEnumerable<IDynamicHandlerClaimTicket> claimTickets)
{
try
{
_semaphore.Wait();
var claimTicketSet = new HashSet<IDynamicHandlerClaimTicket>(claimTickets);
foreach(var key in _handlers.Keys)
{
_handlers[key] = _handlers[key].RemoveAll(x =>
{
if(x.ClaimTicket is null)
{
return true;
}

if(claimTicketSet.Contains(x.ClaimTicket))
{
x.ClaimTicket = null;
return true;
}

return false;
});
}
}
finally
{
_semaphore.Release();
}
}

internal ImmutableList<DelegateHandlerDescriptor>? GetDelegateHandlerDescriptors(Type eventType)
{
_ = _handlers.TryGetValue(eventType, out ImmutableList<DelegateHandlerDescriptor>? handlerDescriptors);
return handlerDescriptors;
}
}
5 changes: 5 additions & 0 deletions src/M.EventBrokerSlim/Internal/DynamicHandlerClaimTicket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using System;

namespace M.EventBrokerSlim.Internal;

internal sealed record DynamicHandlerClaimTicket(Guid Id) : IDynamicHandlerClaimTicket;
45 changes: 36 additions & 9 deletions src/M.EventBrokerSlim/Internal/ThreadPoolEventHandlerRunner.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -16,6 +17,7 @@ internal sealed class ThreadPoolEventHandlerRunner
private readonly DelegateHandlerRegistry _delegateHandlerRegistry;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ILogger<ThreadPoolEventHandlerRunner> _logger;
private readonly DynamicEventHandlers _dynamicEventHandlers;
private readonly SemaphoreSlim _semaphore;
private readonly DefaultObjectPool<HandlerExecutionContext> _contextObjectPool;

Expand All @@ -25,13 +27,15 @@ internal ThreadPoolEventHandlerRunner(
EventHandlerRegistry eventHandlerRegistry,
DelegateHandlerRegistry delegateHandlerRegistry,
CancellationTokenSource cancellationTokenSource,
ILogger<ThreadPoolEventHandlerRunner>? logger)
ILogger<ThreadPoolEventHandlerRunner>? logger,
DynamicEventHandlers dynamicEventHandlers)
{
_channelReader = channel.Reader;
_eventHandlerRegistry = eventHandlerRegistry;
_delegateHandlerRegistry = delegateHandlerRegistry;
_cancellationTokenSource = cancellationTokenSource;
_logger = logger ?? new NullLogger<ThreadPoolEventHandlerRunner>();
_dynamicEventHandlers = dynamicEventHandlers;
_semaphore = new SemaphoreSlim(_eventHandlerRegistry.MaxConcurrentHandlers, _eventHandlerRegistry.MaxConcurrentHandlers);

var retryQueue = new RetryQueue(channel.Writer, cancellationTokenSource.Token);
Expand All @@ -58,10 +62,14 @@ private async ValueTask ProcessEvents()
RetryDescriptor? retryDescriptor = @event as RetryDescriptor;
if(retryDescriptor is null)
{
var type = @event.GetType();
var eventHandlers = _eventHandlerRegistry.GetEventHandlers(type);
var delegateEventHandlers = _delegateHandlerRegistry.GetHandlers(type);
if(eventHandlers.Length == 0 && delegateEventHandlers.Length == 0)
Type type = @event.GetType();
ImmutableArray<EventHandlerDescriptor> eventHandlers = _eventHandlerRegistry.GetEventHandlers(type);
ImmutableArray<DelegateHandlerDescriptor> delegateEventHandlers = _delegateHandlerRegistry.GetHandlers(type);
ImmutableList<DelegateHandlerDescriptor>? dynamicEventHandlers = _dynamicEventHandlers.GetDelegateHandlerDescriptors(type);

if(eventHandlers.Length == 0 &&
delegateEventHandlers.Length == 0 &&
(dynamicEventHandlers is null || dynamicEventHandlers.IsEmpty))
{
if(!_eventHandlerRegistry.DisableMissingHandlerWarningLog)
{
Expand All @@ -75,19 +83,38 @@ private async ValueTask ProcessEvents()
{
await _semaphore.WaitAsync(token).ConfigureAwait(false);

var eventHandlerDescriptor = eventHandlers[i];
EventHandlerDescriptor eventHandlerDescriptor = eventHandlers[i];

var context = _contextObjectPool.Get().Initialize(@event, eventHandlerDescriptor, null, retryDescriptor, token);
HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, eventHandlerDescriptor, null, retryDescriptor, token);
_ = Task.Factory.StartNew(static async x => await HandleEvent(x!), context);
}

for(int i = 0; i < delegateEventHandlers.Length; i++)
{
await _semaphore.WaitAsync(token).ConfigureAwait(false);

var delegateHandlerDescriptor = delegateEventHandlers[i];
DelegateHandlerDescriptor delegateHandlerDescriptor = delegateEventHandlers[i];

var context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token);
HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token);
_ = Task.Factory.StartNew(static async x => await HandleEventWithDelegate(x!), context);
}

if(dynamicEventHandlers is null || dynamicEventHandlers.IsEmpty)
{
continue;
}

for(int i = 0; i < dynamicEventHandlers.Count; i++)
{
await _semaphore.WaitAsync(token).ConfigureAwait(false);

DelegateHandlerDescriptor delegateHandlerDescriptor = dynamicEventHandlers[i];
if(delegateHandlerDescriptor.ClaimTicket is null)
{
continue;
}

HandlerExecutionContext context = _contextObjectPool.Get().Initialize(@event, null, delegateHandlerDescriptor, retryDescriptor, token);
_ = Task.Factory.StartNew(static async x => await HandleEventWithDelegate(x!), context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace M.EventBrokerSlim.Tests.DelegateHandlerTests;

public class HandlerExecutionTests
public class HandlerExecutionTests : IDisposable
{
private readonly ITestOutputHelper _output;
private readonly ServiceProvider _serviceProvider;
Expand Down Expand Up @@ -315,4 +315,9 @@ public async Task Retry_From_Wrapper()

_output.WriteLine($"Elapsed: {_tracker.Elapsed}");
}

public void Dispose()
{
_serviceProvider.Dispose();
}
}
Loading

0 comments on commit d11abb7

Please sign in to comment.