Skip to content

Commit

Permalink
Update README
Browse files Browse the repository at this point in the history
  • Loading branch information
petar-m committed Jun 28, 2024
1 parent cb4a738 commit b3c79a6
Showing 1 changed file with 139 additions and 46 deletions.
185 changes: 139 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ Features:
- in-memory, in-process
- publishing is *Fire and Forget* style
- events don't have to implement specific interface
- event handlers are runned on a `ThreadPool` threads
- event handlers are executed on a `ThreadPool` threads
- the number of concurrent handlers running can be limited
- built-in retry option
- tightly integrated with Microsoft.Extensions.DependencyInjection
- each handler is resolved and runned in a new DI container scope
- each handler is resolved and executed in a new DI container scope
- **NEW** event can be handled by registered delegate

# How does it work

Implement an event handler by implementing `IEventHadler<TEvent>` interface:
Implement an event handler by implementing `IEventHandler<TEvent>` interface:

```csharp
public record SomeEvent(string Message);
Expand All @@ -37,17 +38,28 @@ public class SomeEventHandler : IEventHandler<SomeEvent>

public async Task OnError(Exception exception, SomeEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken)
{
// called on unhandled exeption from Handle
// called on unhandled exception from Handle
// optionally use retryPolicy.RetryAfter(TimeSpan)
}
}
```
```

or create an instance of `DelegateHandlerRegistryBuilder` and register handlers:

```csharp
DelegateHandlerRegistryBuilder builder = new();
builder.RegisterHandler<SomeEvent>(
static async (SomeEvent someEvent, ISomeService service, CancellationToken cancellationToken) =>
{
await service.DoSomething(someEvent, cancellationToken);
});
```

Add event broker impelementation to DI container using `AddEventBroker` extension method and register handlers:
Add event broker implementation to DI container using `AddEventBroker` extension method and register handlers, optionally add delegate handler registries:

```csharp
serviceCollection.AddEventBroker(
x => x.AddTransient<SomeEvent, SomeEventHandler>());
serviceCollection.AddEventBroker(x => x.AddTransient<SomeEvent, SomeEventHandler>())
.AddSingleton(builder);
```

Inject `IEventBroker` and publish events:
Expand All @@ -72,69 +84,155 @@ class MyClass

# Design

`EventBroker` uses `System.Threading.Channels.Channel<T>` to decouple procucers from consumers.
`EventBroker` uses `System.Threading.Channels.Channel<T>` to decouple producers from consumers.

There are no limits for publishers. Publishing is as fast as writing an event to a channel.

Event handlers are resolved by event type in a new DI scope which is disposed after handler comletes. Each handler execution is scheduled on the `ThreadPool` without blocking the producer. No more than configured maximum handlers run concurrently.
Event handlers are resolved by event type in a new DI scope which is disposed after handler completes. Each handler execution is scheduled on the `ThreadPool` without blocking the producer. No more than configured maximum handlers run concurrently.

![](docs/event_broker.png)

# Details

## Events

Events can be of any type. A best pracice for event is to be immutable - may be processed by multiple handlers in different threads.
Events can be of any type. A best practice for event is to be immutable - may be processed by multiple handlers in different threads.

## Event Handlers

Event handlers have to implement `IEventHandler<TEvent>` interface and to be registered in the DI container.
For each event handler a new DI container scope is created and the event handler is resolved from it. This way it can safely use injected services.
Every event handler is scheduled for execution on the `ThreadPool` without blocking the producer.
Event handlers can be specified in two ways:
- By implementing `IEventHandler<TEvent>` interface and registering the implementation in the DI container.
- By registering a handler delegate using `DelegateHandlerRegistryBuilder` and adding the `DelegateHandlerRegistryBuilder` instance to the DI container.

Both approaches can be used side by side, even for the same event. No matter how handlers are specified, a new DI container scope is created for each event handler. Every event handler is scheduled for execution on the `ThreadPool` without blocking the producer.

### Event Handlers Implementing `IEventHandler<TEvent>`

When event of type `TEvent` is published, `EventBroker` will resolve each `IEventHandler<TEvent>` implementation from a dedicated scope. This means that additional dependencies can be injected via the handler constructor, also resolved from the same scope.

The parameters of `IEventHandler<TEvent>` methods are managed by `EventBroker`.
```csharp
Task Handle(TEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken);

Task OnError(Exception exception, TEvent @event, IRetryPolicy retryPolicy, CancellationToken cancellationToken);
```
- `TEvent` - the instance of the published event.
- `IRetryPolicy` - the instance of the retry policy for the handler (see [Retries](#retries) section).
- `CancellationToken` - the `EventBroker` cancellation token.
- `Exception` - exception thrown from `Handle`.

Since event handlers are executed on the `ThreadPool`, there is nowhere to propagate unhandled exceptions.
An exception thrown from `Handle` method is caught and passed to `OnError` method of the same handler instance (may be on another thread however).
An exception thrown from `OnError` is handled and swallowed and potentially logged (see [Logging](#logging) section).

### Delegate Event Handlers

Delegate handlers are registered using `DelegateHandlerRegistryBuilder` against event type.

```csharp
DelegateHandlerRegistryBuilder builder = new();
builder.RegisterHandler<SomeEvent>(
static async (SomeEvent someEvent, ISomeService someService) =>
{
await someService.DoSomething();
});
```

Registered delegate must return a `Task`. Delegate can have 0 to 16 parameters. All of them will be resolved from dedicated container scope and passed when the delegate is invoked.
There are few special cases of optional parameters managed by `EventBroker` (without being registered in DI container):
- `TEvent` - an instance of the event being handled - this should match the type of the event the delegate was registered for.
- `IRetryPolicy` - the instance of the retry policy for the handler (see [Retries](#retries) section).
- `CancellationToken` - the `EventBroker` cancellation token.
- `INextHandler` - used to call the next wrapper in the chain or the handler if no more wrappers available (see below).

Delegate handlers do not provide special exception handling. Exception caused by resolving services or unhandled exception during execution will be handled and swallowed and potentially logged (see [Logging](#logging) section).

Delegate handlers registration has a decorator-like feature allowing to pipeline multiple delegates. The `INextHandler` instance is used to call the next in the pipeline.

```csharp
builder.RegisterHandler<SomeEvent>(
static async (SomeEvent someEvent, ISomeService someService) => await someService.DoSomething())
.WrapWith(
static async (INextHandler next, ILogger logger)
{
try
{
await next.Execute();
}
catch(Exception ex)
{
logger.LogError(ex);
}
})
.WrapWith(
static async (SomeEvent someEvent, ILogger logger)
{
Stopwatch timer = new();
await next.Execute();
timer.Stop();
logger.LogInformation("{event} handling duration {elapsed}", someEvent, timer.Elapsed);
});
```

Delegate wrappers are executed from the last registered moving "inwards" toward the handler.

## Configuration

`EventBroker` is depending on `Microsoft.Extentions.DependencyInjection` container for resolving event handlers.
It guarantees that each handler is resolved in a new scope which is disposed after the handler completes.
`EventBroker` is depending on `Microsoft.Extensions.DependencyInjection` container for resolving event handlers and their dependencies. It guarantees that each handler is resolved in a new scope which is disposed after the handler completes. There can be multiple handlers for the same event.

`EventBroker` is configured with `AddEventBroker` and `AddEventHandlers` extension methods of `IServiceCollection` using a confiuration delegate.
Event handlers are registered by the event type and a corresponding `IEventHandler` implementation as transient, scoped, or singleton.
`EventBroker` is configured with `AddEventBroker` extension method of `IServiceCollection` using a configuration delegate.

*Example:*
```csharp
services.AddEventBroker(
x => x.WithMaxConcurrentHandlers(3)
.DisableMissingHandlerWarningLog()
.AddTransient<Event1, EventHandler1>()
.AddScoped<Event2, EventHandler2>()
.AddSingleton<Event3, EventHandler3>())
services.AddEventBroker(x => x.WithMaxConcurrentHandlers(3)
.DisableMissingHandlerWarningLog());
```

`WithMaxConcurrentHandlers` defines how many handlers can run at the same time. Default is 2.

`DisableMissingHandlerWarningLog` suppresses logging warning when there is no handler found for event.

`EventBroker` behavior and event handlers can be configured with separate extension methods. The order of calls to `AddEventBroker` and `AddEventHandlers` does not matter.
### Handlers Implementing `IEventHandler<TEvent>`

Event handlers are registered by the event type and a corresponding `IEventHandler` implementation as transient, scoped, or singleton. `AddEventHandlers` extension method of `IServiceCollection` provides a configuration delegate.
*Example:*
```csharp
services.AddEventBroker(
x => x.WithMaxConcurrentHandlers(3)
.DisableMissingHandlerWarningLog());

services.AddEventHandlers(
x => x.AddTransient<Event1, EventHandler1>()
.AddScoped<Event2, EventHandler2>()
.AddSingleton<Event3, EventHandler3>())
```

There can be multiple handlers for the same event.

Handler implementations can also be registered in the `AddEventBroker` method.
*Example:*
```csharp
services.AddEventBroker(x => x.WithMaxConcurrentHandlers(3)
.DisableMissingHandlerWarningLog()
.AddTransient<Event1, EventHandler1>()
.AddScoped<Event2, EventHandler2>()
.AddSingleton<Event3, EventHandler3>());
```
The order of calls to `AddEventBroker` and `AddEventHandlers` does not matter. `AddEventHandlers` can be called multiple times.
Note that handlers **not** registered using `AddEventBroker` or `AddEventHandlers` methods will be **ignored** by `EventBroker`.

## Publishing Events
### Delegate Handlers

Delegate event handlers are registered using `DelegateHandlerRegistryBuilder` instance.
*Example:*
```csharp
DelegateHandlerRegistryBuilder builder = new();
builder.RegisterHandler<SomeEvent>(
static async (SomeEvent someEvent, ISomeService someService) =>
{
await someService.DoSomething();
});

services.AddSingleton(builder);
```

Multiple `DelegateHandlerRegistryBuilder` instances can be registered. They can be used to register delegate handlers after the service collection has been built. However registrations should be completed before `IEventBroker` instance is resolved.

`IEventBroker` and its implementation are registered in the DI container by the `AddEventBroker` method.
## Publishing Events

Events are published using `IEventBroker.Publish` method.

Expand All @@ -143,37 +241,32 @@ Events can be published after given time interval with `IEventBroker.PublishDefe
**Caution**: `PublishDeferred` may not be accurate and may perform badly if large amount of deferred messages are scheduled. It runs a new task that in turn uses `Task.Delay` and then publishes the event.
A lot of `Task.Delay` means a lot of timers waiting in a queue.

## Exception Handling

Since event handlers are executed on the `ThreadPool`, there is nowhere to propagate unhandled ecxeptions.

An exception thrown from `Handle` method is caught and passed to `OnError` method of the same handler instance (may be on another thread however).

An exception thrown from `OnError` is handled and swallowed and potentially logged.

## Logging

If there is logging configured in the DI container, `EventBroker` will use it to log when:
If there is `ILogger` configured in the DI container, `EventBroker` will use it to log when:
- There is no event handler found for published event (warning). Can be disabled with `DisableMissingHandlerWarningLog()` during configuration.
- Exception is thrown during event handler resolving (error).
- Exception is thrown from handlers `OnError()` method (error).
- Exception is thrown from delegate handler (error).

If there is no logger configured, these exceptions will be handled and swallowed.

## Retries

Retrying within event hadler can become a bottleneck. Imagine `EventBroker` is restricted to one concurrent handler. An exception is caught in `Handle` and retry is attempted after given time interval. Since `Handle` is not completed, there is no available "slot" to run other handlers while `Handle` is waiting.
Retrying within event handler can become a bottleneck. Imagine `EventBroker` is restricted to one concurrent handler. An exception is caught in `Handle` and retry is attempted after given time interval. Since `Handle` is not completed, there is no available "slot" to run other handlers while `Handle` is waiting.

Another option will be to use `IEventBroker.PublishDeferred`. This will eliminate the bottleneck but will itroduce different problems. The same event will be handled again by all handlers, meaning specaial care should be taken to make all handlers idempotent. Any additional information (e.g. number of retries) needs to be known, it should be carried with the event, introducing accidential complexity.
Another option will be to use `IEventBroker.PublishDeferred`. This will eliminate the bottleneck but will introduce different problems. The same event will be handled again by all handlers, meaning special care should be taken to make all handlers idempotent. Any additional information (e.g. number of retries) needs to be known, it should be carried with the event, introducing accidental complexity.

To avoid these problems, both `IEventBroker` `Handle` and `OnError` methods have `IRetryPolicy` parameter.
To avoid these problems, both `IEventBroker` `Handle` and `OnError` methods have `IRetryPolicy` parameter. It is also available for delegate handlers.

`IRetryPolicy.RetryAfter()` will schedule a retry only for the handler it is called from, without blocking. After the given time interval an instance of the handler will be resolved from the DI container (from a new scope) and executed with the same event instance.

`IRetryPolicy.Attempt` is the current retry attempt for a given handler and event.
`IRetryPolicy.LastDelay` is the time interval before the retry.

`IRetryPolicy.RetryRequested` is used to coordinate retry request between `Handle` and `OnError`. `IRetryPolicy` is passed to both methods to enable error handling and retry request entirely in `Handle` method. `OnError` can check `IRetryPolicy.RetryRequested` to know whether `Hanlde` had called `IRetryPolicy.RetryAfter()`.
`IRetryPolicy.RetryRequested` is used to coordinate retry request between `Handle` and `OnError`. `IRetryPolicy` is passed to both methods to enable error handling and retry request entirely in `Handle` method. `OnError` can check `IRetryPolicy.RetryRequested` to know whether `Handle` had called `IRetryPolicy.RetryAfter()`.

If added as a parameter, the `IRetryPolicy` will be passed to delegate wrappers and handler.

**Caution:** the retry will not be exactly after the specified time interval in `IRetryPolicy.RetryAfter()`. Take into account a tolerance of around 50 milliseconds. Additionally, retry executions respect maximum concurrent handlers setting, meaning a high load can cause additional delay.

0 comments on commit b3c79a6

Please sign in to comment.