Skip to content

Commit

Permalink
async runners implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
petar-m committed Sep 10, 2020
1 parent ffd750e commit ed28aa6
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/M.EventBroker/Async/DelegateEventHandlerAsync.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker
namespace M.EventBroker.Async
{
/// <summary>
/// An adapter used to represent delegate as IEventHandler.
Expand Down Expand Up @@ -46,7 +46,7 @@ public DelegateEventHandlerAsync(Func<TEvent, Task> handler, Func<TEvent, Task<b
/// <returns>A value indicating whether the event handler should be executed.</returns>
public Task<bool> ShouldHandleAsync(TEvent @event)
{
if(_filter is null)
if (_filter is null)
{
return Task.FromResult(true);
}
Expand Down
2 changes: 1 addition & 1 deletion src/M.EventBroker/Async/EventBrokerAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Linq;
using System.Threading.Tasks;

namespace M.EventBroker
namespace M.EventBroker.Async
{
/// <summary>
/// Manages event subscriptions and invoking of event handlers.
Expand Down
2 changes: 1 addition & 1 deletion src/M.EventBroker/Async/EventHandlerAsyncWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker
namespace M.EventBroker.Async
{
internal class EventHandlerAsyncWrapper<TEvent> : IEventHandlerAsync<TEvent>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker.EvenHandlerRunners
{
/// <summary>
/// Runs event handlers on the thread as the caller, blocking it until all handlers are runned.
/// </summary>
public class ContinueAfterHandlersRunnerAsync : IEventHandlerRunnerAsync
{
/// <summary>
/// Runs event handlers on the thread as the caller, blocking it until all handlers are runned.
/// </summary>
/// <param name="handlers">The event handlers to run.</param>
public async Task RunAsync(params Func<Task>[] handlers)
{
foreach (Func<Task> handler in handlers)
{
await handler().ConfigureAwait(false);
}
}

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace M.EventBroker.Async.EvenHandlerRunners
{
/// <summary>
/// Runs event handlers on a ThreadPool threads, restricting the running handlers to a given number.
/// </summary>
public class RestrictedThreadPoolRunnerAsync : IEventHandlerRunnerAsync
{
private readonly object _locker = new object();
private readonly TimeSpan _timeout = TimeSpan.FromSeconds(1);
private readonly BlockingCollection<Func<Task>> _handlerActions = new BlockingCollection<Func<Task>>();
private int _currentlyRunning = 0;

private bool _isRunning;
private readonly int _maxConcurrentHandlers;

/// <summary>
/// Creates a new instance of the RestrictedThreadPoolRunner class.
/// </summary>
/// <param name="maxConcurrentHandlers">Specifies the maximum number of event handlers running concurrently.</param>
public RestrictedThreadPoolRunnerAsync(int maxConcurrentHandlers)
{
_maxConcurrentHandlers = maxConcurrentHandlers > 0 ? maxConcurrentHandlers : throw new ArgumentOutOfRangeException($"Parameter {nameof(maxConcurrentHandlers)} should be positive integer (value was: {maxConcurrentHandlers})");
_isRunning = true;
Thread thread = new Thread(Worker);
thread.Start();
}

/// <summary>
/// Runs event handlers on a ThreadPool threads.
/// </summary>
/// <param name="handlers">The event handlers to run.</param>
public async Task RunAsync(params Func<Task>[] handlers)
{
foreach (Func<Task> handler in handlers)
{
var handler1 = handler;
_handlerActions.Add(handler1);
}

await Task.CompletedTask.ConfigureAwait(false);
}

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
if (_isRunning)
{
_isRunning = false;
Thread.Sleep(_timeout);
}

_handlerActions.Dispose();
}

private void Worker()
{
while (_isRunning)
{
if (!_handlerActions.TryTake(out Func<Task> action, _timeout))
{
continue;
}

while (_isRunning)
{
lock (_locker)
{
if (_currentlyRunning < _maxConcurrentHandlers)
{
_currentlyRunning++;
_ = Task.Run(async () => await RunHandlerAsync(action).ConfigureAwait(false));
break;
}
}

Thread.Sleep(TimeSpan.FromMilliseconds(50));
}
}
}

private async Task RunHandlerAsync(Func<Task> handler)
{
await handler().ConfigureAwait(false);

lock (_locker)
{
_currentlyRunning--;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace M.EventBroker.Async.EvenHandlerRunners
{
/// <summary>
/// Runs event handlers on a ThreadPool threads.
/// </summary>
public class UnrestrictedThreadPoolRunnerAsync : IEventHandlerRunnerAsync
{
/// <summary>
/// Runs event handlers on a ThreadPool threads.
/// </summary>
/// <param name="handlers">The event handlers to run.</param>
public async Task RunAsync(params Func<Task>[] handlers)
{
foreach (Func<Task> handler in handlers)
{
var handler1 = handler;
_ = Task.Run(async () => await handler1().ConfigureAwait(false));
}

await Task.CompletedTask.ConfigureAwait(false);
}

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
}
}
}
10 changes: 5 additions & 5 deletions src/M.EventBroker/M.EventBroker.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<Authors>Petar Marinov</Authors>
<Company>Petar Marinov</Company>
<Description>A multi-threaded in-memory implementation of publish–subscribe pattern.</Description>
<Copyright>Copyright (c) 2018 Petar Marinov</Copyright>
<PackageLicenseUrl>https://github.com/petar-m/EventBroker/blob/master/LICENSE</PackageLicenseUrl>
<Copyright>Copyright (c) 2010 Petar Marinov</Copyright>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/petar-m/EventBroker</PackageProjectUrl>
<RepositoryUrl>https://github.com/petar-m/EventBroker</RepositoryUrl>
<PackageTags>event broker in-memory publish subscribe</PackageTags>
<PackageReleaseNotes></PackageReleaseNotes>
<AssemblyVersion>2.0.0.0</AssemblyVersion>
<FileVersion>2.0.0.0</FileVersion>
<Version>2.0.0</Version>
<AssemblyVersion>2.1.0.0</AssemblyVersion>
<FileVersion>2.1.0.0</FileVersion>
<Version>2.1.0</Version>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)'=='Debug'">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using M.EventBroker.EvenHandlerRunners;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace M.EventBroker.Async.EvenHandlerRunners.Tests
{
public class ContinueAfterHandlersRunnerAsyncTests
{
[Fact]
public async Task Run_WithMultipleActions_ContinuesAfterHandlers()
{
// Arrange
int? thread1 = null;
Func<Task> action1 = async () => { thread1 = Thread.CurrentThread.ManagedThreadId; await Task.Delay(100).ConfigureAwait(false); };

int? thread2 = null;
Func<Task> action2 = async () => { thread2 = Thread.CurrentThread.ManagedThreadId; await Task.Delay(100).ConfigureAwait(false); };

var runner = new ContinueAfterHandlersRunnerAsync();

// Act
await runner.RunAsync(action1, action2).ConfigureAwait(false);

// Assert
Assert.NotNull(thread1);
Assert.NotNull(thread2);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using FakeItEasy;
using M.EventBroker.Async.Tests;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace M.EventBroker.Async.EvenHandlerRunners.Tests
{
public class RestrictedThreadPoolRunnerAsyncTests
{
[Fact]
public async Task Run_WithRestrictionOfTwoAndTwoActions_ActionsAreRunnedOnDifferentThreads()
{
// Arrange
int? thread1 = null;
Func<Task> action1 = async () => { thread1 = Thread.CurrentThread.ManagedThreadId; await Task.Delay(30).ConfigureAwait(false); };

int? thread2 = null;
Func<Task> action2 = async () => { thread2 = Thread.CurrentThread.ManagedThreadId; await Task.Delay(30).ConfigureAwait(false); };

var runner = new RestrictedThreadPoolRunnerAsync(2);

// Act
await runner.RunAsync(action1, action2).ConfigureAwait(false); ;

// Assert
Thread.Sleep(100);

Assert.NotNull(thread1);
Assert.NotEqual(Thread.CurrentThread.ManagedThreadId, thread1);

Assert.NotNull(thread2);
Assert.NotEqual(Thread.CurrentThread.ManagedThreadId, thread2);

Assert.NotEqual(thread1, thread2);
}

[Fact]
public async Task Run_WithRestrictionOfOneAndMultipleAcrions_AllActionsAreRunnedAsync()
{
// Arrange
var action1 = A.Fake<IActionAsync>();
A.CallTo(() => action1.Action()).Invokes(async () => await Task.Delay(50).ConfigureAwait(false));

var action2 = A.Fake<IActionAsync>();
A.CallTo(() => action2.Action()).Invokes(async () => await Task.Delay(50).ConfigureAwait(false));

var action3 = A.Fake<IActionAsync>();
A.CallTo(() => action3.Action()).Invokes(async () => await Task.Delay(50).ConfigureAwait(false));

var runner = new RestrictedThreadPoolRunnerAsync(1);

// Act
await runner.RunAsync(action1.Action, action2.Action, action3.Action).ConfigureAwait(false);

// Assert
Thread.Sleep(1000);

A.CallTo(() => action1.Action())
.MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => action2.Action())
.MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => action3.Action())
.MustHaveHappened(Repeated.Exactly.Once);
}

[Fact]
public void Constructor_WithNegativeMaxConcurrentHandlers_ThrowsException()
{
Action constructor = () => new RestrictedThreadPoolRunnerAsync(-3);
Assert.Throws<ArgumentOutOfRangeException>(constructor);
}

[Fact]
public void Constructor_WithZeroMaxConcurrentHandlers_ThrowsException()
{
Action constructor = () => new RestrictedThreadPoolRunnerAsync(0);
Assert.Throws<ArgumentOutOfRangeException>(constructor);
}

[Fact]
public async Task Run_DisposeWhileRunning_ExitsGracefullyAsync()
{
// Arrange
var action1 = A.Fake<IActionAsync>();
A.CallTo(() => action1.Action()).Invokes(async () => await Task.Delay(300).ConfigureAwait(false));

var action2 = A.Fake<IActionAsync>();
A.CallTo(() => action2.Action()).Invokes(async () => await Task.Delay(200).ConfigureAwait(false));

// Act
using (var runner = new RestrictedThreadPoolRunnerAsync(1))
{
await runner.RunAsync(action1.Action, action2.Action).ConfigureAwait(false);
await Task.Delay(10).ConfigureAwait(false);
runner.Dispose();
}

// Assert
Thread.Sleep(1000);

A.CallTo(() => action1.Action())
.MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => action2.Action())
.MustNotHaveHappened();
}

[Fact]
public async Task Run_DisposeTwiceWhileRunning_ExitsGracefullyAsync()
{
// Arrange
var action1 = A.Fake<IActionAsync>();
A.CallTo(() => action1.Action()).Invokes(async () => await Task.Delay(1000).ConfigureAwait(false));

// Act
using (var runner = new RestrictedThreadPoolRunnerAsync(1))
{
await runner.RunAsync(action1.Action).ConfigureAwait(false);
await Task.Delay(100).ConfigureAwait(false);
runner.Dispose();
}

// Assert
await Task.Delay(100).ConfigureAwait(false);

A.CallTo(() => action1.Action())
.MustHaveHappenedOnceExactly();
}
}
}
Loading

0 comments on commit ed28aa6

Please sign in to comment.