Skip to content

Commit

Permalink
Add retry feature
Browse files Browse the repository at this point in the history
  • Loading branch information
petar-m committed Apr 30, 2024
1 parent 522f6a6 commit 6abda82
Show file tree
Hide file tree
Showing 29 changed files with 1,593 additions and 71 deletions.
124 changes: 124 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
root = true

# All files
[*]
indent_style = space

# Xml files
[*.xml]
indent_size = 2

[*.cs]
#### Naming styles ####

# Naming rules

dotnet_naming_rule.private_field_should_be_private_field_naming.severity = warning
dotnet_naming_rule.private_field_should_be_private_field_naming.symbols = private_field
dotnet_naming_rule.private_field_should_be_private_field_naming.style = private_field_naming

# Symbol specifications

dotnet_naming_symbols.private_field.applicable_kinds = field
dotnet_naming_symbols.private_field.applicable_accessibilities = private
dotnet_naming_symbols.private_field.required_modifiers =

# Naming styles

dotnet_naming_style.private_field_naming.required_prefix = _
dotnet_naming_style.private_field_naming.required_suffix =
dotnet_naming_style.private_field_naming.word_separator =
dotnet_naming_style.private_field_naming.capitalization = camel_case
csharp_indent_labels = one_less_than_current
csharp_using_directive_placement = outside_namespace:silent
csharp_prefer_simple_using_statement = true:suggestion
csharp_prefer_braces = true:silent
csharp_style_namespace_declarations = file_scoped:silent
csharp_style_prefer_method_group_conversion = true:silent
csharp_style_prefer_top_level_statements = true:silent
csharp_style_prefer_primary_constructors = true:suggestion
csharp_style_expression_bodied_methods = false:silent
csharp_style_expression_bodied_constructors = false:silent
csharp_style_expression_bodied_operators = false:silent
csharp_style_expression_bodied_properties = true:silent
csharp_style_expression_bodied_indexers = true:silent
csharp_style_expression_bodied_accessors = true:silent
csharp_style_expression_bodied_lambdas = true:silent
csharp_style_expression_bodied_local_functions = false:silent
csharp_space_around_binary_operators = before_and_after
csharp_style_inlined_variable_declaration = true:suggestion
csharp_style_allow_embedded_statements_on_same_line_experimental = true:silent
csharp_style_allow_blank_lines_between_consecutive_braces_experimental = true:silent
csharp_style_allow_blank_line_after_colon_in_constructor_initializer_experimental = true:silent
csharp_style_allow_blank_line_after_token_in_conditional_expression_experimental = true:silent
csharp_style_allow_blank_line_after_token_in_arrow_expression_clause_experimental = true:silent
csharp_space_after_keywords_in_control_flow_statements = false

[*.{cs,vb}]
#### Naming styles ####

# Naming rules

dotnet_naming_rule.interface_should_be_begins_with_i.severity = suggestion
dotnet_naming_rule.interface_should_be_begins_with_i.symbols = interface
dotnet_naming_rule.interface_should_be_begins_with_i.style = begins_with_i

dotnet_naming_rule.types_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.types_should_be_pascal_case.symbols = types
dotnet_naming_rule.types_should_be_pascal_case.style = pascal_case

dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members
dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case

# Symbol specifications

dotnet_naming_symbols.interface.applicable_kinds = interface
dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.interface.required_modifiers =

dotnet_naming_symbols.types.applicable_kinds = class, struct, interface, enum
dotnet_naming_symbols.types.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.types.required_modifiers =

dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method
dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.non_field_members.required_modifiers =

# Naming styles

dotnet_naming_style.begins_with_i.required_prefix = I
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.capitalization = pascal_case

dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case

dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case
dotnet_style_operator_placement_when_wrapping = beginning_of_line
tab_width = 4
indent_size = 4
end_of_line = crlf
dotnet_style_coalesce_expression = true:suggestion
dotnet_style_null_propagation = true:suggestion
dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion
dotnet_style_prefer_auto_properties = true:silent
dotnet_style_object_initializer = true:suggestion
dotnet_style_collection_initializer = true:suggestion
dotnet_style_prefer_simplified_boolean_expressions = true:suggestion
dotnet_style_prefer_conditional_expression_over_assignment = true:silent
dotnet_style_prefer_conditional_expression_over_return = true:silent
dotnet_style_explicit_tuple_names = true:suggestion
dotnet_style_prefer_inferred_tuple_names = true:suggestion
dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion
dotnet_style_prefer_compound_assignment = true:suggestion
dotnet_style_namespace_match_folder = true:suggestion
dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
insert_final_newline = true
1 change: 1 addition & 0 deletions EventBrokerSlim.sln
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{D8DD8B48-A
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{749C73FB-A73E-4794-8019-138CF561888B}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
package-icon.png = package-icon.png
EndProjectSection
EndProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ internal void CreateEventHandlerDescriptor<TEvent, THandler>(Guid eventHandlerKe
Key: eventHandlerKey,
EventType: typeof(TEvent),
InterfaceType: typeof(IEventHandler<TEvent>),
Handle: async (handler, @event, ct) => await ((THandler)handler).Handle((TEvent)@event, ct),
OnError: async (handler, @event, exception, ct) => await ((THandler)handler).OnError(exception, (TEvent)@event, ct));
Handle: async (handler, @event, retryPolicy, ct) => await ((THandler)handler).Handle((TEvent)@event, retryPolicy, ct),
OnError: async (handler, @event, exception, retryPolicy, ct) => await ((THandler)handler).OnError(exception, (TEvent)@event, retryPolicy, ct));

_eventsHandlersDescriptors.Add(descriptor);
}
Expand All @@ -82,14 +82,14 @@ internal static EventHandlerRegistry Build(IEnumerable<EventHandlerRegistryBuild
{
EventBrokerBuilder? eventBrokerBuilder = null;
List<EventHandlerDescriptor> descriptors = new();
foreach (var builder in builders)
foreach(var builder in builders)
{
if (builder is EventBrokerBuilder)
if(builder is EventBrokerBuilder)
{
eventBrokerBuilder = (EventBrokerBuilder)builder;
}

foreach (var descriptor in builder._eventsHandlersDescriptors)
foreach(var descriptor in builder._eventsHandlersDescriptors)
{
descriptors.Add(descriptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static IServiceCollection AddEventBroker(

serviceCollection.AddSingleton(
x => new ThreadPoolEventHandlerRunner(
x.GetRequiredKeyedService<Channel<object>>(eventBrokerKey).Reader,
x.GetRequiredKeyedService<Channel<object>>(eventBrokerKey),
x.GetRequiredService<IServiceScopeFactory>(),
x.GetRequiredService<EventHandlerRegistry>(),
x.GetRequiredKeyedService<CancellationTokenSource>(eventBrokerKey),
Expand Down
6 changes: 4 additions & 2 deletions src/M.EventBrokerSlim/IEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ public interface IEventHandler<TEvent>
/// Handles the event.
/// </summary>
/// <param name="event">An instance of <typeparamref name="TEvent"/> representing the event.</param>
/// <param name="retryPolicy">Provides ability to request a retry for the same event by the handler. Do not keep a reference to this instance, it may be pooled and reused.</param>
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task Handle(TEvent @event, CancellationToken cancellationToken);
Task Handle(TEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken);

/// <summary>
/// Called when an unhadled exception is caught during execution.
Expand All @@ -26,6 +27,7 @@ public interface IEventHandler<TEvent>
/// </summary>
/// <param name="exception">The exception caught.</param>
/// <param name="event">The event instance which handling caused the exception.</param>
/// <param name="retryPolicy">Provides ability to request a retry for the same event by the handler. Do not keep a reference to this instance, it may be pooled and reused.</param>
/// <param name="cancellationToken">A cancellation token that should be used to cancel the work</param>
Task OnError(Exception exception, TEvent @event, CancellationToken cancellationToken);
Task OnError(Exception exception, TEvent @event, RetryPolicy retryPolicy, CancellationToken cancellationToken);
}
4 changes: 2 additions & 2 deletions src/M.EventBrokerSlim/Internal/EventHandlerDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ internal sealed record EventHandlerDescriptor(
Guid Key,
Type EventType,
Type InterfaceType,
Func<object, object, CancellationToken, Task> Handle,
Func<object, object, Exception, CancellationToken, Task> OnError);
Func<object, object, RetryPolicy, CancellationToken, Task> Handle,
Func<object, object, Exception, RetryPolicy, CancellationToken, Task> OnError);
93 changes: 93 additions & 0 deletions src/M.EventBrokerSlim/Internal/HandlerExecutionContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;

namespace M.EventBrokerSlim.Internal;

internal class HandlerExecutionContext
{
private readonly DefaultObjectPool<RetryPolicy> _retryPolicyPool;
private readonly SemaphoreSlim _semaphore;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<ThreadPoolEventHandlerRunner>? _logger;
private readonly DefaultObjectPool<HandlerExecutionContext> _contextObjectPool;
private readonly RetryQueue _retryQueue;

public HandlerExecutionContext(DefaultObjectPool<RetryPolicy> retryPolicyPool, SemaphoreSlim semaphore, IServiceScopeFactory serviceScopeFactory, ILogger<ThreadPoolEventHandlerRunner>? logger, DefaultObjectPool<HandlerExecutionContext> contextObjectPool, RetryQueue retryQueue)
{
_retryPolicyPool = retryPolicyPool;
_semaphore = semaphore;
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
_contextObjectPool = contextObjectPool;
_retryQueue = retryQueue;
}

public HandlerExecutionContext Initialize(object @event, EventHandlerDescriptor eventHandlerDescriptor, RetryDescriptor? retryDescriptor, CancellationToken cancellationToken)
{
Event = @event;
EventHandlerDescriptor = eventHandlerDescriptor;
RetryDescriptor = retryDescriptor;
CancellationToken = cancellationToken;

RetryPolicy = RetryDescriptor is null ? _retryPolicyPool.Get() : RetryDescriptor.RetryPolicy;
return this;
}

public void Clear()
{
Event = null;
EventHandlerDescriptor = null;
RetryDescriptor = null;
CancellationToken = default;

RetryPolicy = null;
}

public object? Event { get; private set; }

public EventHandlerDescriptor? EventHandlerDescriptor { get; private set; }

public RetryDescriptor? RetryDescriptor { get; private set; }

public CancellationToken CancellationToken { get; private set; }

public RetryPolicy? RetryPolicy { get; private set; }

public async Task CompleteAsync()
{
if(RetryPolicy!.RetryRequested)
{
RetryPolicy.NextAttempt();
// first retry
if(RetryDescriptor is null)
{
RetryDescriptor = new RetryDescriptor(Event!, EventHandlerDescriptor!, RetryPolicy);
}

await _retryQueue.Enqueue(RetryDescriptor).ConfigureAwait(false);
}
else
{
_retryPolicyPool.Return(RetryPolicy);
}

_contextObjectPool.Return(this);
_semaphore.Release();
}

public object GetService(IServiceProvider serviceProvider)
=> serviceProvider.GetRequiredKeyedService(EventHandlerDescriptor!.InterfaceType, EventHandlerDescriptor.Key);

public IServiceScope CreateScope()
=> _serviceScopeFactory.CreateScope();

public void LogEventHandlerResolvingError(Exception exception)
=> _logger?.LogEventHandlerResolvingError(Event!.GetType(), exception);

public void LogUnhandledExceptionFromOnError(Type serviceType, Exception exception)
=> _logger?.LogUnhandledExceptionFromOnError(serviceType, exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;

namespace M.EventBrokerSlim.Internal;

internal class HandlerExecutionContextPooledObjectPolicy : IPooledObjectPolicy<HandlerExecutionContext>
{
private readonly DefaultObjectPool<RetryPolicy> _retryPolicyPool;
private readonly SemaphoreSlim _semaphore;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<ThreadPoolEventHandlerRunner>? _logger;
private readonly RetryQueue _retryQueue;

internal HandlerExecutionContextPooledObjectPolicy(
DefaultObjectPool<RetryPolicy> retryPolicyPool,
SemaphoreSlim semaphore,
IServiceScopeFactory serviceScopeFactory,
ILogger<ThreadPoolEventHandlerRunner>? logger,
RetryQueue retryQueue)
{
_retryPolicyPool = retryPolicyPool;
_semaphore = semaphore;
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
_retryQueue = retryQueue;
}

internal DefaultObjectPool<HandlerExecutionContext>? ContextObjectPool { get; set; }

public HandlerExecutionContext Create()
=> new HandlerExecutionContext(_retryPolicyPool, _semaphore, _serviceScopeFactory, _logger, ContextObjectPool!, _retryQueue);

public bool Return(HandlerExecutionContext obj)
{
obj.Clear();
return true;
}
}
3 changes: 3 additions & 0 deletions src/M.EventBrokerSlim/Internal/RetryDescriptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace M.EventBrokerSlim.Internal;

internal record RetryDescriptor(object Event, EventHandlerDescriptor EventHandlerDescriptor, RetryPolicy RetryPolicy);
17 changes: 17 additions & 0 deletions src/M.EventBrokerSlim/Internal/RetryPolicyPooledObjectPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Microsoft.Extensions.ObjectPool;

namespace M.EventBrokerSlim.Internal;

internal class RetryPolicyPooledObjectPolicy : IPooledObjectPolicy<RetryPolicy>
{
public RetryPolicy Create()
{
return new RetryPolicy();
}

public bool Return(RetryPolicy obj)
{
obj.Clear();
return true;
}
}
Loading

0 comments on commit 6abda82

Please sign in to comment.