Skip to content

Commit

Permalink
event broker async implementation
Browse files Browse the repository at this point in the history
petar-m committed Sep 10, 2020
1 parent ba7b9f6 commit ffd750e
Showing 6 changed files with 838 additions and 1 deletion.
57 changes: 57 additions & 0 deletions src/M.EventBroker/Async/DelegateEventHandlerAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker
{
/// <summary>
/// An adapter used to represent delegate as IEventHandler.
/// </summary>
/// <typeparam name="TEvent">The type of the event to be handled.</typeparam>
public class DelegateEventHandlerAsync<TEvent> : IEventHandlerAsync<TEvent>
{
private readonly Func<TEvent, Task> _handler;
private readonly Func<TEvent, Task<bool>> _filter;
private readonly Func<Exception, TEvent, Task> _onError;

/// <summary>
/// Creates a new instance of the DelegateEventHandler class.
/// </summary>
/// <param name="handler">A delegate used for event handling.</param>
/// <param name="filter">A delegate used to determine whether the event should be handled.</param>
/// <param name="onError">A delegate called when an error is caught during execution.</param>
public DelegateEventHandlerAsync(Func<TEvent, Task> handler, Func<TEvent, Task<bool>> filter = null, Func<Exception, TEvent, Task> onError = null)
{
_handler = handler;
_filter = filter;
_onError = onError;
}

/// <summary>
/// Handles the event.
/// </summary>
/// <param name="event">An instance of TEvent representing the event.</param>
public Task HandleAsync(TEvent @event) => _handler(@event);

/// <summary>
/// Called when an error is caught during execution.
/// </summary>
/// <param name="exception">The exception caught.</param>
/// <param name="event">The event instance which handling caused the exception.</param>
public Task OnErrorAsync(Exception exception, TEvent @event) => _onError?.Invoke(exception, @event);

/// <summary>
/// Returns a value indicating whether the event handler should be executed.
/// </summary>
/// <param name="event">An instance of TEvent representing the event.</param>
/// <returns>A value indicating whether the event handler should be executed.</returns>
public Task<bool> ShouldHandleAsync(TEvent @event)
{
if(_filter is null)
{
return Task.FromResult(true);
}

return _filter(@event);
}
}
}
186 changes: 186 additions & 0 deletions src/M.EventBroker/Async/EventBrokerAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace M.EventBroker
{
/// <summary>
/// Manages event subscriptions and invoking of event handlers.
/// </summary>
public class EventBrokerAsync : IEventBrokerAsync
{
private readonly ConcurrentDictionary<Type, List<object>> _subscribers = new ConcurrentDictionary<Type, List<object>>();
private readonly IEventHandlerAsyncFactory _handlersFactory;
private readonly IEventHandlerRunnerAsync _runner;

/// <summary>
/// Creates a new instance of the EventBrokerAsync class.
/// </summary>
/// <param name="runner"></param>
/// <param name="handlersFactory">A delegate providing event handlers for event of givent type.</param>
public EventBrokerAsync(IEventHandlerRunnerAsync runner, IEventHandlerAsyncFactory handlersFactory = null)
{
_runner = runner;
_handlersFactory = handlersFactory;
}

/// <summary>
/// Adds subscription for events of type <typeparamref name="TEvent"/>.
/// </summary>
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="handler">A delegate that will be invoked when event is published.</param>
/// <param name="filter">A delegate used to perform filtering of events before invoking the handler.</param>
/// <param name="onError">A delegate called when an error is caught during execution.</param>
public void Subscribe<TEvent>(Func<TEvent, Task> handler, Func<TEvent, Task<bool>> filter = null, Func<Exception, TEvent, Task> onError = null)
{
var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List<object>());
handlers.Add(new EventHandlerAsyncWrapper<TEvent>(handler, filter, onError));
}

/// <summary>
/// Adds subscription for events of type <typeparamref name="TEvent"/>.
/// </summary>
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="handler">An instance of IEventHandler&lt;TEvent&gt; which Handle method will be invoked when event is published.</param>
public void Subscribe<TEvent>(IEventHandlerAsync<TEvent> handler)
{
var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List<object>());
handlers.Add(new EventHandlerAsyncWrapper<TEvent>(handler));
}

/// <summary>
/// Removes subscription for events of type <typeparamref name="TEvent"/>.
/// </summary>
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="handler">A delegate to remove form subscribers.</param>
public void Unsubscribe<TEvent>(Func<TEvent, Task> handler)
{
Unsubscribe<TEvent>(x => x.IsWrapping(handler));
}

/// <summary>
/// Removes subscription for events of type <typeparamref name="TEvent"/>.
/// </summary>
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="handler">An instance of IEventHandler&lt;TEvent&gt; to remove form subscribers.</param>
public void Unsubscribe<TEvent>(IEventHandlerAsync<TEvent> handler)
{
Unsubscribe<TEvent>(x => x.IsWrapping(handler));
}

/// <summary>
/// Publishes an event of type <typeparamref name="TEvent"/>.
/// </summary>
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="event">An <typeparamref name="TEvent"/> instance to be passed to all handlers of the event. All handlers will be invoked asynchronously.</param>
public async Task PublishAsync<TEvent>(TEvent @event)
{
await EnqueueSubscribers(@event).ConfigureAwait(false);
await EnqueueFromHandlersFactory(@event).ConfigureAwait(false);
}

/// <summary>
/// Releases all resources used by the current instance of the EventBroker class.
/// </summary>
public void Dispose() => _runner.Dispose();

private void Unsubscribe<TEvent>(Func<EventHandlerAsyncWrapper<TEvent>, bool> handlerPredicate)
{
var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List<object>());
var targetHandlers = handlers.Cast<EventHandlerAsyncWrapper<TEvent>>().ToArray();
foreach (var handlerAction in targetHandlers.Where(x => handlerPredicate(x)))
{
handlers.Remove(handlerAction);
handlerAction.IsSubscribed = false;
}
}

private async Task EnqueueSubscribers<TEvent>(TEvent @event)
{
bool hasSubscribers = _subscribers.TryGetValue(typeof(TEvent), out List<object> handlers);
if (!hasSubscribers)
{
return;
}

if (handlers.Count == 0)
{
return;
}

Func<Task> CreateHandlerAction(EventHandlerAsyncWrapper<TEvent> handler)
{
return async () =>
{
if (!handler.IsSubscribed)
{
return;
}

await TryRunHandler(handler, @event).ConfigureAwait(false);
};
}

Func<Task>[] handlerActions =
handlers.Cast<EventHandlerAsyncWrapper<TEvent>>()
.Select(CreateHandlerAction)
.ToArray();

await _runner.RunAsync(handlerActions).ConfigureAwait(false);
}

private async Task EnqueueFromHandlersFactory<TEvent>(TEvent @event)
{
if (_handlersFactory == null)
{
return;
}

IEnumerable<IEventHandlerAsync<TEvent>> handlerInstances = _handlersFactory.AsyncHandlersFor<TEvent>();
if (handlerInstances == null || !handlerInstances.Any())
{
return;
}

Func<Task> CreateHandlerAction(IEventHandlerAsync<TEvent> handler)
{
return async () => await TryRunHandler(handler, @event).ConfigureAwait(false);
}

Func<Task>[] handlerActions = handlerInstances.Select(CreateHandlerAction).ToArray();

await _runner.RunAsync(handlerActions).ConfigureAwait(false);
}

private async Task TryRunHandler<TEvent>(IEventHandlerAsync<TEvent> handler, TEvent @event)
{
try
{
if (!await handler.ShouldHandleAsync(@event).ConfigureAwait(false))
{
return;
}

await handler.HandleAsync(@event).ConfigureAwait(false);
}
catch (Exception exception)
{
await TryReportError(exception, handler, @event).ConfigureAwait(false);
}
}

private async Task TryReportError<TEvent>(Exception exception, IEventHandlerAsync<TEvent> handler, TEvent @event)
{
try
{
await handler.OnErrorAsync(exception, @event).ConfigureAwait(false);
}
catch
{
// yes, we mute exceptions here
}
}
}
}
40 changes: 40 additions & 0 deletions src/M.EventBroker/Async/EventHandlerAsyncWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker
{
internal class EventHandlerAsyncWrapper<TEvent> : IEventHandlerAsync<TEvent>
{
private readonly IEventHandlerAsync<TEvent> _eventHandler;
private readonly Func<TEvent, Task> _handler;

public EventHandlerAsyncWrapper(IEventHandlerAsync<TEvent> eventHandler)
{
_eventHandler = eventHandler;
_handler = eventHandler.HandleAsync;
IsSubscribed = true;
}

public EventHandlerAsyncWrapper(Func<TEvent, Task> handler, Func<TEvent, Task<bool>> filter = null, Func<Exception, TEvent, Task> onError = null)
{
_eventHandler = new DelegateEventHandlerAsync<TEvent>(handler, filter, onError);
_handler = handler;
IsSubscribed = true;
}

public bool IsSubscribed
{
get; set;
}

public Task HandleAsync(TEvent @event) => _eventHandler.HandleAsync(@event);

public Task<bool> ShouldHandleAsync(TEvent @event) => _eventHandler.ShouldHandleAsync(@event);

public Task OnErrorAsync(Exception exception, TEvent @event) => _eventHandler.OnErrorAsync(exception, @event);

public bool IsWrapping(IEventHandlerAsync<TEvent> eventHandler) => _eventHandler == eventHandler;

public bool IsWrapping(Func<TEvent, Task> handler) => _handler == handler;
}
}
2 changes: 1 addition & 1 deletion src/M.EventBroker/IEventBrokerAsync.cs
Original file line number Diff line number Diff line change
@@ -44,6 +44,6 @@ public interface IEventBrokerAsync : IDisposable
/// <typeparam name="TEvent">The type of the event.</typeparam>
/// <param name="event">An <typeparamref name="TEvent"/> instance to be passed to all handlers of the event.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task Publish<TEvent>(TEvent @event);
Task PublishAsync<TEvent>(TEvent @event);
}
}
Loading

0 comments on commit ffd750e

Please sign in to comment.