Skip to content

Commit

Permalink
Introduce interface IRetryPolicy for RetryPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
petar-m committed May 2, 2024
1 parent 7bf0892 commit 895632d
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 89 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public class SomeEventHandler : IEventHandler<SomeEvent>
{
}

public async Task Handle(SomeEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task Handle(SomeEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
// process the event
}

public async Task OnError(Exception exception, SomeEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task OnError(Exception exception, SomeEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
// called on unhandled exeption from Handle
// optionally use retryPolicy.RetryAfter(TimeSpan)
Expand Down Expand Up @@ -166,14 +166,14 @@ Retrying within event hadler can become a bottleneck. Imagine `EventBroker` is r

Another option will be to use `IEventBroker.PublishDeferred`. This will eliminate the bottleneck but will itroduce different problems. The same event will be handled again by all handlers, meaning specaial care should be taken to make all handlers idempotent. Any additional information (e.g. number of retries) needs to be known, it should be carried with the event, introducing accidential complexity.

To avoid these problems, both `IEventBroker` `Handle` and `OnError` methods have `RetryPolicy` parameter.
To avoid these problems, both `IEventBroker` `Handle` and `OnError` methods have `IRetryPolicy` parameter.

`RetryPolicy.RetryAfter()` will schedule a retry only for the handler it is called from, without blocking. After the given time interval an instance of the handler will be resolved from the DI container (from a new scope) and executed with the same event instance.
`IRetryPolicy.RetryAfter()` will schedule a retry only for the handler it is called from, without blocking. After the given time interval an instance of the handler will be resolved from the DI container (from a new scope) and executed with the same event instance.

`RetryPolicy.Attempt` is the current retry attempt for a given handler and event.
`RetryPolicy.LastDelay` is the time interval before the retry.
`IRetryPolicy.Attempt` is the current retry attempt for a given handler and event.
`IRetryPolicy.LastDelay` is the time interval before the retry.

`RetryPolicy.RetryRequested` is used to coordinate retry request between `Handle` and `OnError`. `RetryPolicy` is passed to both methods to enable error handling and retry request entirely in `Handle` method. `OnError` can check `RetryPolicy.RetryRequested` to know whether `Hanlde` had called `RetryPolicy.RetryAfter()`.
`IRetryPolicy.RetryRequested` is used to coordinate retry request between `Handle` and `OnError`. `IRetryPolicy` is passed to both methods to enable error handling and retry request entirely in `Handle` method. `OnError` can check `IRetryPolicy.RetryRequested` to know whether `Hanlde` had called `IRetryPolicy.RetryAfter()`.

**Caution:** the retry will not be exactly after the specified time interval in `RetryPolicy.RetryAfter()`. Take into account a tolerance of around 50 milliseconds. Additionally, retry executions respect maximum concurrent handlers setting, meaning a high load can cause additional delay.
**Caution:** the retry will not be exactly after the specified time interval in `IRetryPolicy.RetryAfter()`. Take into account a tolerance of around 50 milliseconds. Additionally, retry executions respect maximum concurrent handlers setting, meaning a high load can cause additional delay.

4 changes: 2 additions & 2 deletions package-readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public class SomeEventHandler : IEventHandler<SomeEvent>
{
}

public async Task Handle(SomeEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task Handle(SomeEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
// process the event
}

public async Task OnError(Exception exception, SomeEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task OnError(Exception exception, SomeEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
// called on unhandled exeption from Handle
// optionally use retryPolicy.RetryAfter(TimeSpan)
Expand Down
5 changes: 3 additions & 2 deletions src/M.EventBrokerSlim/IEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using M.EventBrokerSlim.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand All @@ -18,7 +19,7 @@ public interface IEventHandler<TEvent>
/// <param name="event">An instance of <typeparamref name="TEvent"/> representing the event.</param>
/// <param name="retryPolicy">Provides ability to request a retry for the same event by the handler. Do not keep a reference to this instance, it may be pooled and reused.</param>
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task Handle(TEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken);
Task Handle(TEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken);

/// <summary>
/// Called when an unhadled exception is caught during execution.
Expand All @@ -29,5 +30,5 @@ public interface IEventHandler<TEvent>
/// <param name="event">The event instance which handling caused the exception.</param>
/// <param name="retryPolicy">Provides ability to request a retry for the same event by the handler. Do not keep a reference to this instance, it may be pooled and reused.</param>
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task OnError(Exception exception, TEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken);
Task OnError(Exception exception, TEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,34 @@
namespace M.EventBrokerSlim;

/// <summary>
/// Describes a retry request for re-processing an event.
/// Represents a retry request for re-processing an event.
/// </summary>
public class RetryPolicy
public interface IRetryPolicy
{
private TimeSpan _delay;

internal RetryPolicy()
{
}

/// <summary>
/// Requests invoking of the same handler with the same event after given time interval.
/// Current attempt for the same handler and event.
/// </summary>
/// <param name="delay">The time interval to wait before re-processing.</param>
public void RetryAfter(TimeSpan delay)
{
_delay = delay;
RetryRequested = true;
}
uint Attempt { get; }

/// <summary>
/// Requests invoking of the same handler with the same event after given time interval.
/// The time interval delay used for the last re-processing.
/// </summary>
/// <param name="delay">A func taking the attempt number for the same handler and event and the last retry interval and returning the new wait interval before re-processing.</param>
public void RetryAfter(Func<uint, TimeSpan, TimeSpan> delay)
{
_delay = delay(Attempt, _delay);
RetryRequested = true;
}
TimeSpan LastDelay { get; }

/// <summary>
/// Current attempt for the same handler and event.
/// Indicates whether a re-processing has been requested for the handler and event.
/// </summary>
public uint Attempt { get; private set; }
bool RetryRequested { get; }

/// <summary>
/// The time interval delay used for the last re-processing.
/// Requests invoking of the same handler with the same event after given time interval.
/// </summary>
public TimeSpan LastDelay => _delay;
/// <param name="delay">A func taking the attempt number for the same handler and event and the last retry interval and returning the new wait interval before re-processing.</param>
void RetryAfter(Func<uint, TimeSpan, TimeSpan> delay);

/// <summary>
/// Indicates whether a re-processing has been requested for the handler and event.
/// Requests invoking of the same handler with the same event after given time interval.
/// </summary>
public bool RetryRequested { get; private set; }

internal void NextAttempt()
{
Attempt++;
RetryRequested = false;
}

internal void Clear()
{
Attempt = 0;
_delay = TimeSpan.Zero;
RetryRequested = false;
}
/// <param name="delay">The time interval to wait before re-processing.</param>
void RetryAfter(TimeSpan delay);
}
43 changes: 43 additions & 0 deletions src/M.EventBrokerSlim/Internal/RetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;

namespace M.EventBrokerSlim.Internal;

internal class RetryPolicy : IRetryPolicy
{
private TimeSpan _delay;

internal RetryPolicy()
{
}

public void RetryAfter(TimeSpan delay)
{
_delay = delay;
RetryRequested = true;
}

public void RetryAfter(Func<uint, TimeSpan, TimeSpan> delay)
{
_delay = delay(Attempt, _delay);
RetryRequested = true;
}

public uint Attempt { get; private set; }

public TimeSpan LastDelay => _delay;

public bool RetryRequested { get; private set; }

internal void NextAttempt()
{
Attempt++;
RetryRequested = false;
}

internal void Clear()
{
Attempt = 0;
_delay = TimeSpan.Zero;
RetryRequested = false;
}
}
4 changes: 2 additions & 2 deletions test/M.EventBrokerSlim.Tests/EventBrokerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public TestEventHandler(EventsRecorder<int> eventsRecorder, Timestamp? timestamp
_timestamp = timestamp;
}

public async Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecoder.Notify(@event);

Expand All @@ -412,7 +412,7 @@ public async Task Handle(TestEvent @event, RetryPolicy retryPolicy, Cancellation
}
}

public async Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecoder.Notify(exception, @event);

Expand Down
8 changes: 4 additions & 4 deletions test/M.EventBrokerSlim.Tests/ExceptionHandlingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public TestEventHandler(EventsRecorder<int> eventBroker)
_eventsRecorder = eventBroker;
}

public Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(@event);
if(@event.ThrowFromHandle)
Expand All @@ -159,7 +159,7 @@ public Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken
return Task.CompletedTask;
}

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(exception, @event);
if(@event.ThrowFromOnError)
Expand All @@ -179,8 +179,8 @@ public TestEventHandler1(string input)
_input = input;
}

public Task Handle(TestEvent @even, RetryPolicy retryPolicyt, CancellationToken cancellationToken) => throw new NotImplementedException();
public Task Handle(TestEvent @even, IRetryPolicy retryPolicyt, CancellationToken cancellationToken) => throw new NotImplementedException();

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken) => throw new NotImplementedException();
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken) => throw new NotImplementedException();
}
}
4 changes: 2 additions & 2 deletions test/M.EventBrokerSlim.Tests/HandlerExecutionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public TestEventHandler(EventsRecorder<int> eventsRecorder)
_eventsRecoder = eventsRecorder;
}

public async Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public async Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
if(@event.TimeToRun != default)
{
Expand All @@ -166,7 +166,7 @@ public async Task Handle(TestEvent @event, RetryPolicy retryPolicy, Cancellation
_eventsRecoder.Notify(@event);
}

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecoder.Notify(exception, @event);
return Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions test/M.EventBrokerSlim.Tests/HandlerRegistrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ public TestEventHandler(EventsRecorder<string> eventsRecorder)
_eventsRecorder = eventsRecorder;
}

public Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify($"{@event.CorrelationId}_{GetType().Name}");
return Task.CompletedTask;
}

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(exception, @event);
return Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions test/M.EventBrokerSlim.Tests/HandlerScopeAndInstanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ public TestEventHandler(EventsRecorder<int> eventsRecorder, IServiceProvider sco
_scope = scope;
}

public Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(@event);
_eventsRecorder.Notify(@event, handlerInstance: GetHashCode(), scopeInstance: _scope.GetHashCode());
return Task.CompletedTask;
}

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(exception, @event);
return Task.CompletedTask;
Expand Down
12 changes: 6 additions & 6 deletions test/M.EventBrokerSlim.Tests/LoadTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public TestEventHandler1(HandlerSettings settings, EventsTracker tracker)
_tracker = tracker;
}

public Task Handle(T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_tracker.Track(@event);
if(@event.Number % 250 == 0)
Expand All @@ -91,7 +91,7 @@ public Task Handle(T @event, RetryPolicy retryPolicy, CancellationToken cancella
return Task.CompletedTask;
}

public Task OnError(Exception exception, T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
if(@event.Number % 250 == 0 && retryPolicy.Attempt < _settings.RetryAttempts)
{
Expand All @@ -112,7 +112,7 @@ public TestEventHandler2(HandlerSettings settings, EventsTracker tracker)
_tracker = tracker;
}

public Task Handle(T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_tracker.Track(@event);
if(@event.Number % 500 == 0 && retryPolicy.Attempt < _settings.RetryAttempts)
Expand All @@ -122,7 +122,7 @@ public Task Handle(T @event, RetryPolicy retryPolicy, CancellationToken cancella
return Task.CompletedTask;
}

public Task OnError(Exception exception, T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand All @@ -137,13 +137,13 @@ public TestEventHandler3(EventsTracker tracker)
_tracker = tracker;
}

public Task Handle(T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_tracker.Track(@event!);
return Task.CompletedTask;
}

public Task OnError(Exception exception, T @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, T @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand Down
4 changes: 2 additions & 2 deletions test/M.EventBrokerSlim.Tests/MultipleHandlersTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public TestEventHandler(EventsRecorder<string> eventsRecorder)
_eventsRecorder = eventsRecorder;
}

public Task Handle(TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify($"{@event.CorrelationId}_{GetType().Name}");
return Task.CompletedTask;
}

public Task OnError(Exception exception, TestEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_eventsRecorder.Notify(exception, @event);
return Task.CompletedTask;
Expand Down
8 changes: 4 additions & 4 deletions test/M.EventBrokerSlim.Tests/OrderOfRetriesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TestEventHandler1(EventsTracker tracker)
_tracker = tracker;
}

public Task Handle(TestEvent1 @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent1 @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_tracker.Track(@event);
if(retryPolicy.Attempt < 1)
Expand All @@ -71,7 +71,7 @@ public Task Handle(TestEvent1 @event, RetryPolicy retryPolicy, CancellationToken
throw new NotImplementedException();
}

public Task OnError(Exception exception, TestEvent1 @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent1 @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand All @@ -86,7 +86,7 @@ public TestEventHandler2(EventsTracker tracker)
_tracker = tracker;
}

public Task Handle(TestEvent2 @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task Handle(TestEvent2 @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
_tracker.Track(@event);
if(retryPolicy.Attempt < 2)
Expand All @@ -96,7 +96,7 @@ public Task Handle(TestEvent2 @event, RetryPolicy retryPolicy, CancellationToken
throw new NotImplementedException();
}

public Task OnError(Exception exception, TestEvent2 @event, RetryPolicy retryPolicy, CancellationToken cancellationToken)
public Task OnError(Exception exception, TestEvent2 @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand Down
Loading

0 comments on commit 895632d

Please sign in to comment.