diff --git a/src/M.EventBroker/Async/DelegateEventHandlerAsync.cs b/src/M.EventBroker/Async/DelegateEventHandlerAsync.cs
new file mode 100644
index 0000000..46bd1c8
--- /dev/null
+++ b/src/M.EventBroker/Async/DelegateEventHandlerAsync.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Threading.Tasks;
+
+namespace M.EventBroker
+{
+ ///
+ /// An adapter used to represent delegate as IEventHandler.
+ ///
+ /// The type of the event to be handled.
+ public class DelegateEventHandlerAsync : IEventHandlerAsync
+ {
+ private readonly Func _handler;
+ private readonly Func> _filter;
+ private readonly Func _onError;
+
+ ///
+ /// Creates a new instance of the DelegateEventHandler class.
+ ///
+ /// A delegate used for event handling.
+ /// A delegate used to determine whether the event should be handled.
+ /// A delegate called when an error is caught during execution.
+ public DelegateEventHandlerAsync(Func handler, Func> filter = null, Func onError = null)
+ {
+ _handler = handler;
+ _filter = filter;
+ _onError = onError;
+ }
+
+ ///
+ /// Handles the event.
+ ///
+ /// An instance of TEvent representing the event.
+ public Task HandleAsync(TEvent @event) => _handler(@event);
+
+ ///
+ /// Called when an error is caught during execution.
+ ///
+ /// The exception caught.
+ /// The event instance which handling caused the exception.
+ public Task OnErrorAsync(Exception exception, TEvent @event) => _onError?.Invoke(exception, @event);
+
+ ///
+ /// Returns a value indicating whether the event handler should be executed.
+ ///
+ /// An instance of TEvent representing the event.
+ /// A value indicating whether the event handler should be executed.
+ public Task ShouldHandleAsync(TEvent @event)
+ {
+ if(_filter is null)
+ {
+ return Task.FromResult(true);
+ }
+
+ return _filter(@event);
+ }
+ }
+}
diff --git a/src/M.EventBroker/Async/EventBrokerAsync.cs b/src/M.EventBroker/Async/EventBrokerAsync.cs
new file mode 100644
index 0000000..d377e46
--- /dev/null
+++ b/src/M.EventBroker/Async/EventBrokerAsync.cs
@@ -0,0 +1,186 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace M.EventBroker
+{
+ ///
+ /// Manages event subscriptions and invoking of event handlers.
+ ///
+ public class EventBrokerAsync : IEventBrokerAsync
+ {
+ private readonly ConcurrentDictionary> _subscribers = new ConcurrentDictionary>();
+ private readonly IEventHandlerAsyncFactory _handlersFactory;
+ private readonly IEventHandlerRunnerAsync _runner;
+
+ ///
+ /// Creates a new instance of the EventBrokerAsync class.
+ ///
+ ///
+ /// A delegate providing event handlers for event of givent type.
+ public EventBrokerAsync(IEventHandlerRunnerAsync runner, IEventHandlerAsyncFactory handlersFactory = null)
+ {
+ _runner = runner;
+ _handlersFactory = handlersFactory;
+ }
+
+ ///
+ /// Adds subscription for events of type .
+ ///
+ /// The type of the event.
+ /// A delegate that will be invoked when event is published.
+ /// A delegate used to perform filtering of events before invoking the handler.
+ /// A delegate called when an error is caught during execution.
+ public void Subscribe(Func handler, Func> filter = null, Func onError = null)
+ {
+ var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List());
+ handlers.Add(new EventHandlerAsyncWrapper(handler, filter, onError));
+ }
+
+ ///
+ /// Adds subscription for events of type .
+ ///
+ /// The type of the event.
+ /// An instance of IEventHandler<TEvent> which Handle method will be invoked when event is published.
+ public void Subscribe(IEventHandlerAsync handler)
+ {
+ var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List());
+ handlers.Add(new EventHandlerAsyncWrapper(handler));
+ }
+
+ ///
+ /// Removes subscription for events of type .
+ ///
+ /// The type of the event.
+ /// A delegate to remove form subscribers.
+ public void Unsubscribe(Func handler)
+ {
+ Unsubscribe(x => x.IsWrapping(handler));
+ }
+
+ ///
+ /// Removes subscription for events of type .
+ ///
+ /// The type of the event.
+ /// An instance of IEventHandler<TEvent> to remove form subscribers.
+ public void Unsubscribe(IEventHandlerAsync handler)
+ {
+ Unsubscribe(x => x.IsWrapping(handler));
+ }
+
+ ///
+ /// Publishes an event of type .
+ ///
+ /// The type of the event.
+ /// An instance to be passed to all handlers of the event. All handlers will be invoked asynchronously.
+ public async Task PublishAsync(TEvent @event)
+ {
+ await EnqueueSubscribers(@event).ConfigureAwait(false);
+ await EnqueueFromHandlersFactory(@event).ConfigureAwait(false);
+ }
+
+ ///
+ /// Releases all resources used by the current instance of the EventBroker class.
+ ///
+ public void Dispose() => _runner.Dispose();
+
+ private void Unsubscribe(Func, bool> handlerPredicate)
+ {
+ var handlers = _subscribers.GetOrAdd(typeof(TEvent), _ => new List());
+ var targetHandlers = handlers.Cast>().ToArray();
+ foreach (var handlerAction in targetHandlers.Where(x => handlerPredicate(x)))
+ {
+ handlers.Remove(handlerAction);
+ handlerAction.IsSubscribed = false;
+ }
+ }
+
+ private async Task EnqueueSubscribers(TEvent @event)
+ {
+ bool hasSubscribers = _subscribers.TryGetValue(typeof(TEvent), out List handlers);
+ if (!hasSubscribers)
+ {
+ return;
+ }
+
+ if (handlers.Count == 0)
+ {
+ return;
+ }
+
+ Func CreateHandlerAction(EventHandlerAsyncWrapper handler)
+ {
+ return async () =>
+ {
+ if (!handler.IsSubscribed)
+ {
+ return;
+ }
+
+ await TryRunHandler(handler, @event).ConfigureAwait(false);
+ };
+ }
+
+ Func[] handlerActions =
+ handlers.Cast>()
+ .Select(CreateHandlerAction)
+ .ToArray();
+
+ await _runner.RunAsync(handlerActions).ConfigureAwait(false);
+ }
+
+ private async Task EnqueueFromHandlersFactory(TEvent @event)
+ {
+ if (_handlersFactory == null)
+ {
+ return;
+ }
+
+ IEnumerable> handlerInstances = _handlersFactory.AsyncHandlersFor();
+ if (handlerInstances == null || !handlerInstances.Any())
+ {
+ return;
+ }
+
+ Func CreateHandlerAction(IEventHandlerAsync handler)
+ {
+ return async () => await TryRunHandler(handler, @event).ConfigureAwait(false);
+ }
+
+ Func[] handlerActions = handlerInstances.Select(CreateHandlerAction).ToArray();
+
+ await _runner.RunAsync(handlerActions).ConfigureAwait(false);
+ }
+
+ private async Task TryRunHandler(IEventHandlerAsync handler, TEvent @event)
+ {
+ try
+ {
+ if (!await handler.ShouldHandleAsync(@event).ConfigureAwait(false))
+ {
+ return;
+ }
+
+ await handler.HandleAsync(@event).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ await TryReportError(exception, handler, @event).ConfigureAwait(false);
+ }
+ }
+
+ private async Task TryReportError(Exception exception, IEventHandlerAsync handler, TEvent @event)
+ {
+ try
+ {
+ await handler.OnErrorAsync(exception, @event).ConfigureAwait(false);
+ }
+ catch
+ {
+ // yes, we mute exceptions here
+ }
+ }
+ }
+}
diff --git a/src/M.EventBroker/Async/EventHandlerAsyncWrapper.cs b/src/M.EventBroker/Async/EventHandlerAsyncWrapper.cs
new file mode 100644
index 0000000..b3bcc4c
--- /dev/null
+++ b/src/M.EventBroker/Async/EventHandlerAsyncWrapper.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Threading.Tasks;
+
+namespace M.EventBroker
+{
+ internal class EventHandlerAsyncWrapper : IEventHandlerAsync
+ {
+ private readonly IEventHandlerAsync _eventHandler;
+ private readonly Func _handler;
+
+ public EventHandlerAsyncWrapper(IEventHandlerAsync eventHandler)
+ {
+ _eventHandler = eventHandler;
+ _handler = eventHandler.HandleAsync;
+ IsSubscribed = true;
+ }
+
+ public EventHandlerAsyncWrapper(Func handler, Func> filter = null, Func onError = null)
+ {
+ _eventHandler = new DelegateEventHandlerAsync(handler, filter, onError);
+ _handler = handler;
+ IsSubscribed = true;
+ }
+
+ public bool IsSubscribed
+ {
+ get; set;
+ }
+
+ public Task HandleAsync(TEvent @event) => _eventHandler.HandleAsync(@event);
+
+ public Task ShouldHandleAsync(TEvent @event) => _eventHandler.ShouldHandleAsync(@event);
+
+ public Task OnErrorAsync(Exception exception, TEvent @event) => _eventHandler.OnErrorAsync(exception, @event);
+
+ public bool IsWrapping(IEventHandlerAsync eventHandler) => _eventHandler == eventHandler;
+
+ public bool IsWrapping(Func handler) => _handler == handler;
+ }
+}
diff --git a/src/M.EventBroker/IEventBrokerAsync.cs b/src/M.EventBroker/IEventBrokerAsync.cs
index 747a025..2788e9b 100644
--- a/src/M.EventBroker/IEventBrokerAsync.cs
+++ b/src/M.EventBroker/IEventBrokerAsync.cs
@@ -44,6 +44,6 @@ public interface IEventBrokerAsync : IDisposable
/// The type of the event.
/// An instance to be passed to all handlers of the event.
/// The task object representing the asynchronous operation.
- Task Publish(TEvent @event);
+ Task PublishAsync(TEvent @event);
}
}
diff --git a/test/M.EventBroker.Tests/Async/EventBrokerAsyncTests.cs b/test/M.EventBroker.Tests/Async/EventBrokerAsyncTests.cs
new file mode 100644
index 0000000..3e747f1
--- /dev/null
+++ b/test/M.EventBroker.Tests/Async/EventBrokerAsyncTests.cs
@@ -0,0 +1,545 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using FakeItEasy;
+using Xunit;
+
+namespace M.EventBroker.Async.Tests
+{
+ public class EventBrokerAsyncTests
+ {
+ [Fact]
+ public async Task Subscribe_WithDelegate_CreatesSubscribsion()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+
+ var handler = A.Fake>();
+
+ // Act
+ broker.Subscribe(handler.HandleAsync);
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task Subscribe_WithHandlerInstance_CreatesSubscribsion()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+
+ var handler = A.Fake>();
+ A.CallTo(() => handler.ShouldHandleAsync(null))
+ .WithAnyArguments()
+ .Returns(true);
+
+ // Act
+ broker.Subscribe(handler);
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task Unsubscribe_WithDelegate_RemovesSubscribsion()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+
+ var handler = A.Fake>();
+
+ // Act
+ broker.Subscribe(handler.HandleAsync);
+ broker.Unsubscribe(handler.HandleAsync);
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Unsubscribe_WithHandlerInstance_RemovesSubscribsion()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+
+ var handler = A.Fake>();
+
+ // Act
+ broker.Subscribe(handler);
+ broker.Unsubscribe(handler);
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Publish_WithHandlersFactory_HandlerIsRunned()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ var eventHandlersFactory = new EventHandlersFactory();
+ eventHandlersFactory.Add(() => handlerMock);
+
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock.HandleAsync("event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task Publish_WithHandlersFactory_FilterIsRespected()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(false);
+
+ var eventHandlersFactory = new EventHandlersFactory();
+ eventHandlersFactory.Add(() => handlerMock);
+
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock.HandleAsync("event"))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Publish_WithHandlersFactoryReturningNull_NothingHappens()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var eventHandlersFactory = new EventHandlersFactory();
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Publish_WithHandlerInstanceSubscription_FilterIsRespected()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(false);
+
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock.HandleAsync("event"))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Publish_WithDelegateSubscription_FilterIsRespected()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(false);
+
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock.HandleAsync, handlerMock.ShouldHandleAsync);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock.HandleAsync("event"))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Publish_WithMultipleSubscribers_AllHandlersAreRunned()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First()).ToList().ForEach(a => a.Invoke()));
+
+ var handlerMock1 = A.Fake>();
+ A.CallTo(() => handlerMock1.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ var eventHandlersFactory = new EventHandlersFactory();
+ eventHandlersFactory.Add(() => handlerMock1);
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ var handlerMock2 = A.Fake>();
+ A.CallTo(() => handlerMock2.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+ broker.Subscribe(handlerMock2);
+
+ var handlerMock3 = A.Fake>();
+ broker.Subscribe(handlerMock3.HandleAsync);
+
+ var handlerMock4 = A.Fake>();
+ broker.Subscribe(handlerMock4);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerMock1.HandleAsync("event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock2.HandleAsync("event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock3.HandleAsync("event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock4.HandleAsync(A.Ignored))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public void EventBroker_Dispose_DisposesIHandlerRunner()
+ {
+ // Arrange
+ var handlerRunnerMock = A.Fake();
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+
+ // Act
+ broker.Dispose();
+
+ // Assert
+ A.CallTo(() => handlerRunnerMock.Dispose())
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task Publish_WithDelegateSubscriptionUnsubscribedWhileWaiting_HandlerIsNotRunned()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ var handlerRunnerMock = A.Fake();
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x =>
+ {
+ Task.Factory.StartNew(() =>
+ {
+ Thread.Sleep(500);
+ ((Func[])x.Arguments.First())[0].Invoke();
+ });
+ });
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock.HandleAsync);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+ broker.Unsubscribe(handlerMock.HandleAsync);
+
+ // Assert
+ Thread.Sleep(500);
+ A.CallTo(() => handlerRunnerMock.RunAsync(A>.Ignored))
+ .MustHaveHappened(Repeated.Exactly.Once);
+
+ A.CallTo(() => handlerMock.HandleAsync("event"))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task SubscribedWithOnErrorDelegate_ExceptionThrownDuringHandling_OnExceptionIsCalled()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock.HandleAsync, handlerMock.ShouldHandleAsync, handlerMock.OnErrorAsync);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, "event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task SubscribedWithoutOnErrorDelegate_ExceptionThrownDuringHandling_ExceptionIsMuted()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock.HandleAsync, handlerMock.ShouldHandleAsync, null);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ Assert.True(true);
+ }
+
+ [Fact]
+ public async Task SubscribedWithOnErrorDelegate_ExceptionThrownDuringOnError_ExceptionIsMuted()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, A.Ignored))
+ .Throws();
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock.HandleAsync, handlerMock.ShouldHandleAsync, handlerMock.OnErrorAsync);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ Assert.True(true);
+ }
+
+ [Fact]
+ public async Task SubscribedWithHandlerInstancee_ExceptionThrownDuringHandling_OnExceptionIsCalled()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, "event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task SubscribedWithHandlerInstance_ExceptionThrownDuringOnError_ExceptionIsMuted()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, A.Ignored))
+ .Throws();
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock);
+ broker.Subscribe(handlerMock);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ Assert.True(true);
+ }
+
+ [Fact]
+ public async Task SubscribedWithHandlersFactory_ExceptionThrownDuringHandling_OnExceptionIsCalled()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ var eventHandlersFactory = new EventHandlersFactory();
+ eventHandlersFactory.Add(() => handlerMock);
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, "event"))
+ .MustHaveHappened(Repeated.Exactly.Once);
+ }
+
+ [Fact]
+ public async Task SubscribedWithHandlersFactory_ExceptionThrownDuringOnError_ExceptionIsMuted()
+ {
+ // Arrange
+ var handlerMock = A.Fake>();
+
+ A.CallTo(() => handlerMock.ShouldHandleAsync(A.Ignored))
+ .Returns(true);
+
+ A.CallTo(() => handlerMock.HandleAsync(A.Ignored))
+ .Throws();
+
+ A.CallTo(() => handlerMock.OnErrorAsync(A.Ignored, A.Ignored))
+ .Throws();
+
+ var eventHandlersFactory = new EventHandlersFactory();
+ eventHandlersFactory.Add(() => handlerMock);
+
+ var handlerRunnerMock = A.Fake();
+
+ A.CallTo(() => handlerRunnerMock.RunAsync(null))
+ .WithAnyArguments()
+ .Invokes(x => ((Func[])x.Arguments.First())[0].Invoke());
+
+ var broker = new EventBrokerAsync(handlerRunnerMock, eventHandlersFactory);
+
+ // Act
+ await broker.PublishAsync("event").ConfigureAwait(false);
+
+ // Assert
+ Assert.True(true);
+ }
+
+ public class EventHandlersFactory : IEventHandlerAsyncFactory
+ {
+ private readonly List<(Type type, object handler)> _handlers = new List<(Type, object)>();
+
+ public void Add(Func> handlerProvider)
+ {
+ _handlers.Add((typeof(T), handlerProvider));
+ }
+
+ public IEnumerable> AsyncHandlersFor()
+ {
+ return _handlers.Count > 0
+ ? _handlers.Where(x => x.type == typeof(T))
+ .Select(x => ((Func>)x.handler)())
+ .ToArray()
+ : null;
+ }
+ }
+ }
+}
diff --git a/test/M.EventBroker.Tests/Async/IActionAsync.cs b/test/M.EventBroker.Tests/Async/IActionAsync.cs
new file mode 100644
index 0000000..bdaedfc
--- /dev/null
+++ b/test/M.EventBroker.Tests/Async/IActionAsync.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace M.EventBroker.Tests
+{
+ public interface IActionAsync
+ {
+ Task Action();
+ }
+}