Configuration is the most labor-intensive part of using Brighter.Once you have configured Brighter, using its model of requests and handlers is straightforward
This section covers using .NET Core Dependency Injection to configure Brighter. If you want to use an alternative DI container then see the section How Configuration Works
We divide configuration into two sections, depending on your requirements:
- Configuring The Command Processor: This section covers configuring the Command Processor. Use this if you want to dispatch requests to handlers, or publish messages from your application on an external bus
- Configuring The Service Activator: This section covers configuring the Service Activator. Use this if you want to read messages from a transport (and then dispatch to handlers).
Brighter's package:
- Paramore.Brighter.Extensions.DependencyInjection
provides extension methods for ServiceCollection that can be used to add Brighter to the .NET Core DI Framework.
By adding the package you can call the AddBrighter() extension method.
If you are using a Startup class's ConfigureServices method call the following:
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(...)
}
if you are using .NET 6 you can make the call directly on your HostBuilder's Services property.
The AddBrighter() method takes an Action<BrighterOptions>
delegate. The extension method supplies the delegate with a BrighterOptions object that allows you to configure how Brighter runs.
The AddBrighter() method returns an IBrighterBuilder interface. IBrighterBuilder is a fluent interface that you can use to configure additional Brighter properties (see Brighter Builder Fluent Interface).
Brighter uses Polly policies for both internal reliability, and to support adding a custom policy to a handler for reliability.
To use a Polly policy with Brighter you need to register it first with a Polly PolicyRegistry. In this example we register both Synchronous and Asynchronous Polly policies with the registry.
var retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new[]
{
TimeSpan.FromMilliseconds(50),
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150) });
var circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1,
TimeSpan.FromMilliseconds(500));
var retryPolicyAsync = Policy.Handle<Exception>()
.WaitAndRetryAsync(new[] { TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(150) });
var circuitBreakerPolicyAsync = Policy.Handle<Exception>().CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(500));
var policyRegistry = new PolicyRegistry()
{
{ "SyncRetryPolicy", retryPolicy },
{ "SyncCircuitBreakerPolicy", circuitBreakerPolicy },
{ "AsyncRetryPolicy", retryPolicyAsync },
{ "AsyncCircuitBreakerPolicy", circuitBreakerPolicyAsync }
};
And you can use them in you own handler like this:
internal class MyQoSProtectedHandler : RequestHandler<MyCommand>
{
static MyQoSProtectedHandler()
{
ReceivedCommand = false;
}
[UsePolicy(policy: "SyncRetryPolicy", step: 1)]
public override MyCommand Handle(MyCommand command)
{
/*Do work that could throw error because of distributed computing reliability*/
}
}
See the section Policy Retry and Circuit Breaker for more on using Polly policies with handlers.
With the Polly Policy Registry filled, you need to tell Brighter where to find the Policy Registry:
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(options =>
options.PolicyRegistry = policyRegistry
)
}
Brighter can register your Request Handlers and Message Mappers for you (see IBrighter Builder Fluent Interface). When we register types for you with ServiceCollection, we need to register them with a given lifetime (see Dependency Injection Service Lifetimes).
We also allow you to set the lifetime for the CommandProcessor.
We recommend the following lifetimes:
- If you are using Scoped lifetimes, for example with EF Core, make your Request Handlers and your Command Processor Scoped as well.
- If you are not using Scoped lifetimes you can use Transient lifetimes for Request Handlers and a Singleton lifetime for the Command Processor.
- Your Message Mappers should not have state and can be Singletons.
(Be cautious about using Singleton lifetimes for Request Handlers. Even if your Request Handler is stateless today, and so does not risk carrying state across requests, a common bug is that state is added to an existing Request Handler which has previously been registered as a Singleton.)
You configure the lifetimes for the different types that Brighter can create at run-time as follows:
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(options =>
options.HandlerLifetime = ServiceLifetime.Scoped;
options.CommandProcessorLifetime = ServiceLifetime.Scoped;
options.MapperLifetime = ServiceLifetime.Singleton;
);
}
The IBrighterBuilder fluent interface can scan your assemblies for your Request Handlers (inherit from IHandleRequests<> or IHandleRequestsAsync<>) and Message Mappers (inherit from IAmAMessageMapper<>) and register then with the ServiceCollection. This is the most common way to register your code.
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(...)
.AutoFromAssemblies();
}
The code scans any loaded assemblies. If you need to register types from assemblies that are not yet loaded, you can provide a list of additional assemblies to scan as an argument to the call to AutoFromAssemblies().
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(...)
.AutoFromAssemblies(typeof(MyRequestHandlerAsync).Assembly);
}
Instead of using AutoFromAssemblies you can exert more fine-grained control over the registration, by explicitly registering your Request Handlers and Message Mappers. We don't recommend this, but make it available for cases where the automatic registration does not meet your needs.
- MapperRegistryFromAssemblies(), HandlersFromAssemblies() and AsyncHandlersFromAssemblies are the methods called by AutoFromAssemblies() and can be called explicitly.
- Handlers(), AsyncHandlers() and MapperRegistry() accept an Action<> delegate that respectively provide you with IAmASubscriberRegistry or IAmAnAsyncSubscriberRegistry to register your RequestHandlers explicitly or a ServiceCollectionMapperRegistry to register your mappers. This gives you explicit control over what you register.
Using an External Bus allows you to send messages between processes using a message-oriented middleware transport (such as RabbitMQ or Kafka). (For symmetry, we refer to the usage of the Command Processor without an external bus as using an Internal Bus).
When raising a message on the Internal Bus, you use one of the following methods on the Command Processor:
- Send() and SendAsync() - Sends a Command to one Request Handler.
- Publish() and PublishAsync() - Broadcasts an Event to zero or more Request Handlers.
When raising a message on an External Bus, you use the following methods on the CommandProcessor:
- Post() and PostAsync() - Immediately posts a Command or Event to another process via the external Bus
- DepositPost() and DepositPostAsync() - Puts on or many Command(s) or Event(s) in the Outbox for later delivery
- ClearOutbox() and ClearOutboxAsync() - Clears the Outbox, posting un-dispatched messages to another process via the External Bus.
- ClearAsyncOutbox() - Implicitly clears the Outbox, similar to above however allows bulk dispatching of messages onto a Transport.
The major difference here is whether or not you wish to use an Outbox for Transactional Messaging. (See Outbox Pattern and Brighter Outbox Support for more on Brighter and the Outbox Pattern).
To use an External Bus, you need to supply Brighter with configuration information that tells Brighter what middleware you are using and how to find it. (You don't need to do anything to configure an Internal Bus, it is always available.)
The IBrighterBuilder interface returned from AddBrighter allows you to configure the properties of your external bus, by calling the UseExternalBus extension method. The UseExternalBus extension method takes a lambda function, whose only parameter is an ExternalBusConfiguration. The ExternalBusConfiguration lets you set properties such as
private void ConfigureBrighter(IServiceCollection services)
{
services.AddBrighter(options =>
{
...
})
.UseExternalBus((configure) =>
{
})
.AutoFromAssemblies();
Transports are how Brighter supports specific Message-Oriented-Middleware (MoM). Transports are provided in separate NuGet packages so that you can take a dependency only on the transport that you need. Brighter supports a number of different transports.
We use the naming convention Paramore.Brighter.MessagingGateway.{TRANSPORT} for transports where {TRANSPORT} is the name of the middleware.
In this example we will use the transport for RabbitMQ, provided by the NuGet package:
- Paramore.Brighter.MessagingGateway.RMQ
See the documentation for detail on specific transports on how to configure them for use with Brighter, for now it is enough to know that you need to provide a Messaging Gateway which tells us how to reach the middleware and a Publication which tells us how to configure the middleware.
A Publication configures a transport for sending a message to it's associated MoM. So an RmqPublication configures how we publish a message to RabbitMQ. There are a number of common properties to all publications.
- MakeChannels: Do you want Brighter to create the infrastructure? Brighter can create infrastructure that it needs, and is aware of: OnMissingChannel.Create. So a publication can create the topic to send messages to. Alternatively if you create the channel by another method, such as IaaC, we can verify the infrastructure on startup: OnMissingChannel.Validate. Finally, you can avoid the performance cost of runtime checks by assuming your infrastructure exists: OnMissingChannel.Assume.
- MaxOutstandingMessages: How large can the number of messages in the Outbox grow before we stop allowing new messages to be published and raise an OutboxLimitReachedException.
- MaxOutStandingCheckIntervalMilliSeconds: How often do we check to see if the Outbox is full.
- Topic: A Topic is the key used within the MoM to route messages. Publishers publish to a topic and subscribers, subscribe to it. We use a class RoutingKey to encapsulate the identifier used for a topic. The name the MoM uses for a topic may vary. Kafka & SNS use topic whilst RMQ uses routingkey
In order to provide Brighter with the means to send a message via the transport, we need to provide it with an IAmAProducerRegistry for the transport you intend to use for the External Bus.
A producer is the transport specific code that you need to send messages; it implements IAmAMessageProducer.
We register a producer with a producer registry; it needs to implement IAmAProducerRegistry but usually you will use the provided ProducerRegistry. At runtime, we lookup the producer to use in the registry by routing key (aka topic).
Typically for a transport we implement a producer registry factory; it needs to implement IAmAProducerRegistryFactory. For example, for the RMQ transport, we provide RmqProducerRegistryFactory. A producer registry factory typically takes a connection to the broker and a collection of publications, and it iterates over the publications creating a producer for each one and registering it in a producer registry. It then returns a configured producer registry.
The following code shows an application using the RMQ transport support to create its producer registry.
var producerRegistry = new RmqProducerRegistryFactory(
new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")),
Exchange = new Exchange("paramore.brighter.exchange"),
},
new RmqPublication[]
{
new RmqPublication
{
Topic = new RoutingKey("GreetingMade"),
MaxOutStandingMessages = 5,
MaxOutStandingCheckIntervalMilliSeconds = 500,
WaitForConfirmsTimeOutInMilliseconds = 1000,
MakeChannels = OnMissingChannel.Create
}
}
).Create();
Putting this together, an example configuration for an External Bus for a local RabbitMQ instance could look like this:
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(...)
.UseExternalBus((configure) =>
{
configure.ProducerRegistry = new RmqProducerRegistryFactory(
new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")),
Exchange = new Exchange("paramore.brighter.exchange"),
},
new RmqPublication[]{
new RmqPublication
{
Topic = new RoutingKey("GreetingMade"),
MaxOutStandingMessages = 5,
MaxOutStandingCheckIntervalMilliSeconds = 500,
WaitForConfirmsTimeOutInMilliseconds = 1000,
MakeChannels = OnMissingChannel.Create
}}
).Create();
})
.AutoFromAssemblies()
...
}
If you intend to use Brighter's Outbox support for Transactional Messaging then you need to provide us with details of your Outbox.
Brighter provides a number of Outbox implementations for common Dbs (and you can write your own for a Db that we do not support). For this discussion we will look at Brighter's support for working with EF Core. See the documentation for working with specific Outbox implementations.
EF Core supports a number of databases and you should pick the packages that match the Dy you want to use with EF Core. In this case we will choose MySQL.
For this we will need the Outbox packages for the MySQL Outbox.
- Paramore.Brighter.MySql
- Paramore.Brighter.Outbox.MySql
For a given backing store the pattern should be Paramore.Brighter.{DATABASE} and Paramore.Brighter.Outbox.{DATABASE} where {DATABASE} is the name of the Db that you are using.
In addition for an ORM you will need to add the package that supports the ORM, in this case EF Core:
- Paramore.Brighter.MySql.EntityFrameworkCore
For a given ORM the pattern should be Paramore.Brighter.{ORM}.{DATABASE} where {ORM} is the ORM you are choosing and {DATABASE} is the Db you are using with the ORM.
To configure our Outbox we need to use the ExternalBusConfiguration.
An Outbox has three pieces:
- The Outbox, which implements IAmAnOutbox. Brighter provides implementations for a range of common Dbs.
- The Connection Provider which tells Brighter how to connect to the Outbox
- The Transaction Provider which allows Brighter to participate in the same transaction that you update an entity with.
In this example, we want to use EF Core with an MySQL Outbox. See the documentation for Outboxes for specific configuration options.
public void ConfigureServices(IServiceCollection services)
{
var outboxConfiguration = new RelationalDatabaseConfiguration(DbConnectionString());
services.AddSingleton<IAmARelationalDatabaseConfiguration>(outboxConfiguration);
services.AddBrighter(...)
.UseExternalBus((configure) =>
{
configure.Outbox = new MySqlOutbox(outboxConfiguration);
configure.TransactionProvider = typeof(MySqlEntityFrameworkConnectionProvider<GreetingsEntityGateway>);
configure.ConnectionProvider = typeof(MySqlConnectionProvider);
})
.AutoFromAssemblies();
...
}
Typically DbConnectionString would obtain the connection string for the Db from configuration.
Finally, if we want the Outbox to use a background thread to clear un-dispatched items from the Outbox, and we do in most circumstances, we need to run an Outbox Sweeper to do this work. (You can force an immediate clear within the code that produces the outgoing message using ClearOutbox, but you should still have a sweeper to guarantee it is sent if that call fails).
Typically you run one sweeper. Brighter does not have a distributed lock. As such, running a sweeper in every producer will cause issues as multiple sweepers may try to clear an outstanding message. The outbox documentation looks at your strategies for ensuring only one sweeper runs. For development purposes though, you may wish to add a sweeper to the instance that you are currently running.
To add the Outbox Sweeper you will need to take a dependency on another NuGet package:
- Paramore.Brighter.Extensions.Hosting
You can then add a sweeper using "UseOutboxSweeper"
This results in:
public void ConfigureServices(IServiceCollection services)
{
services.AddBrighter(...)
.UseExternalBus(...)
.UseOutboxSweeper()
.AutoFromAssemblies();
...
}
(UseExternalBus() has optional parameters for use with Request-Reply support for some transports. We don't cover that here, instead see Direct Messaging for more).
Brighter defines a set of serialization options for use when it needs to serialize messages to JSON. Internally we use these options in our transports, when serializing messages to an external bus and deserializing from an external bus. You may wish to use these options in your own Message Mapper implementation.
By default our JSONSerialization Options are configured as follows:
static JsonSerialisationOptions()
{
var opts = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
NumberHandling = JsonNumberHandling.AllowReadingFromString,
AllowTrailingCommas = true
};
opts.Converters.Add(new JsonStringConverter());
opts.Converters.Add(new DictionaryStringObjectJsonConverter());
opts.Converters.Add(new ObjectToInferredTypesConverter());
opts.Converters.Add(new JsonStringEnumConverter());
Options = opts;
}
You can use the IBrighterBuilder extension ConfigureJsonSerialisation to override these values. The method takes an Action<JsonSerialisationOptions> lambda expression that allows you to override these defaults. For example:
.ConfigureJsonSerialisation((options) =>
{
options.PropertyNameCaseInsensitive = true;
})
If you want to use this configured set of JSON Serialization options in your own code, you can, by using the static property JsonSerialisationOptions.Options. For example:
public GreetingMade MapToRequest(Message message)
{
return JsonSerializer.Deserialize<GreetingMade>(message.Body.Value, JsonSerialisationOptions.Options);
}
When sending a request via the External Bus we use a Polly policy internally to control Retry and Circuit Breaker in case the External Bus is not available. These policies have defaults but you can configure the behavior using the policy keys:
- Paramore.RETRYPOLICY
- Paramore.CIRCUITBREAKER
Putting all this together, a typical configuration might looks as follows:
public void ConfigureServices(IServiceCollection services)
{
var outboxConfiguration = new RelationalDatabaseConfiguration(DbConnectionString());
services.AddSingleton<IAmARelationalDatabaseConfiguration>(outboxConfiguration);
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
options.MapperLifetime = ServiceLifetime.Singleton;
options.PolicyRegistry = policyRegistry;
})
.ConfigureJsonSerialisation((options) =>
{
options.PropertyNameCaseInsensitive = true;
})
.UseExternalBus((configure) =>
{
configure.ProducerRegistry = new RmqProducerRegistryFactory(
new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")),
Exchange = new Exchange("paramore.brighter.exchange"),
},
new RmqPublication[]{
new RmqPublication
{
Topic = new RoutingKey("GreetingMade"),
MaxOutStandingMessages = 5,
MaxOutStandingCheckIntervalMilliSeconds = 500,
WaitForConfirmsTimeOutInMilliseconds = 1000,
MakeChannels = OnMissingChannel.Create
}}
).Create();
configure.Outbox = new MySqlOutbox(outboxConfiguration);
configure.TransactionProvider = typeof(MySqlEntityFrameworkConnectionProvider<GreetingsEntityGateway>);
configure.ConnectionProvider = typeof(MySqlConnectionProvider);
})
.UseOutboxSweeper()
.AutoFromAssemblies();
}
A consumer reads messages from Message-Oriented Middleware (MoM), and a producer puts messages onto the MoM for the consumer to read.
A consumer waits for messages to appear on the queue, reads them, and then calls your Request Handler code to react. Because the •consumer* runs your code in response to an external request, a message being placed on the External Bus, we call the component that listens for messages and dispatches them a Service Activator
To use Brighter's Service Activator you will need to take a dependency on the NuGet package:
- Paramore.Brighter.ServiceActivator
We provide support for configuring .NET Core's HostBuilder as a ServiceActivator for use with MoM. We use Brighter's Command Processor to dispatch the messages read by a Dipatcher. If you are not using HostBuilder then you will need to configure the Dispatcher yourself. See How Configuring the Dispatcher Works for more.
To use Brighter's Service Activator with HostBuilder you will need to take a dependency on the following NuGet packages:
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection
These provide an extension method AddServiceActivator() that can be used to add Brighter to the .NET Core DI Framework.
By adding the package you can call the AddServiceActivator() extension method.
If you are using a HostBuilder class's ConfigureServices method call the following:
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
services.AddServiceActivator(...)
}
if you are using .NET 6 you can make the call direction on your HostBuilder's Services property.
The AddServiceActivator() method takes an Action<ServiceActivatorOptions>
delegate. The extension method supplies the delegate with a ServiceActivatorOptions object that allows you to configure how Brighter runs.
The AddServiceActivator() method returns an IBrighterBuilder interface. IBrighterBuilder is a fluent interface that you can use to configure Brighter Command Processor properties. It is discussed above at Brighter Builder Fluent Interface) and the same options apply. We discuss one additional option that becomes important when receiving requests the Inbox in Additional Brighter Builder Options.
When configuring your application's Service Activator, your Subscriptions indicate configure how your application will receive messages from the associated MoM queues or streams.
All Subscriptions lets you configure the following common properties.
- Buffer Size: The number of messages to hold in memory. Where the buffer is not shared, a single thread or Performer can access these; where the buffer is shared, multiple threads can access the same buffer of work. Work in a buffer is locked on queue based middleware, and thus not available to other consumers (threads or process depending if the buffer is shared or not) until Acknowledged or Rejected.
- Channel Factory: Creates or finds the necessary infrastructure for messaging on the MoM and wraps it in an object.
- *Channel Name: If queues are primitives in the MoM this names the queue, otherwise just used for diagnostics.
- Channel Failure Delay: How long should we delay if a channel fails before trying again, to give problems time to clear.
- Data Type: We use a Datatype Channel. What is the type of this channel?
- Empty Channel Delay: If there are no messages in the queue or stream when we read, how long should we pause before reading again?
- MakeChannels: Do you want Brighter to create the infrastructure? Brighter can create infrastructure that it needs, and is aware of: OnMissingChannel.Create. So a subscription can create the topic to send messages to, and any subscription to that topic required by the MoM, including a queue (which uses the Channel Name). Alternatively if you create the channel by another method, such as IaaC, we can verify the infrastructure on startup: OnMissingChannel.Validate. Finally, you can avoid the performance cost of runtime checks by assuming your infrastructure exists: OnMissingChannel.Assume.
- Name: What do we call this subscription for diagnostic purposes.
- NoOfPerformers: Effectively, how many threads do we use to read messages from the queue. As Brighter uses a Single-Threaded Apartment model, each thread has it's own message pump and is thus an in-process implementation of the Competing Consumers pattern.
- RequeueCount: How many times can you retry a message before we declare it a poison pill message?
- RequeueDelayInMilliseconds: When we requeue a message how long should we delay it by?
- RoutingKey: The identifier used to routed messages to subscribers on MoM. You publish to this, and subscriber from this. This has different names; in Kafka or SNS this is a Topic, in RMQ this is the routing key.
- RunAsync: Is this an async pipeline? Your pipeline must be sync or async. An async pipeline can increase throughput where a handler is I/O bound by allowing the message pump to read another message whilst we await I/O completion. The cost of this is that strict ordering of messages will now be lost as processing of I/O bound requests may complete out-of-sequence. Brighter provides its own synchronization context for async operations. We recommend scaling via increasing the number of performers, unless you know that I/O is your bottleneck.
- TimeoutInMilliseconds: How long does a read 'wait' before assuming there are no pending messages.
- UnaceptableMessageLimit: Brighter will ack a message that throws an unhandled exception, thus removing it from a queue.
For a more detailed discussion of using Requeue (with Delay) for Handler failure, (RequeueCount and RequeueDelayInMilliseconds) along with termination of a consumer due to message failure (UnacceptableMessageLimit) see Handler Failure
In addition, individual transports that provide access to specific MoM sub-class Subscription to provide properties unique to the chosen middleware. We discuss those under a section for that transport.
For RabbitMQ for example, this would look like this:
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
var subscriptions = new Subscription[]
{
new RmqSubscription<GreetingMade>(
new SubscriptionName("paramore.sample.salutationanalytics"),
new ChannelName("SalutationAnalytics"),
new RoutingKey("GreetingMade"),
runAsync: true,
timeoutInMilliseconds: 200,
isDurable: true,
makeChannels: OnMissingChannel.Create), //change to OnMissingChannel.Validate if you have infrastructure declared elsewhere
};
services.AddServiceActivator(options =>
{
options.Subscriptions = subscriptions;
})
}
...
A Gateway Connection tells Brighter how to connect to MoM for a particular transport. The transport package will contain a Gateway Connection, you need to provide the information to connect to your middleware (URIs, ports, credentials etc.) Your transport package provides a Gateway Connection
A Channel Factory connects Brighter to MoM. Depending on the configuration settings for your Subscription it may create the required primitives (topics/routing keys, queues, streams) on MoM or simply attach to ones that you have created via Infrastructure as Code (IaC). Your transport provides a Channel Factory and you need to pass it a Gateway Connection.
For RabbitMQ, this would look like:
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
var rmqConnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri($"amqp://guest:guest@local:5672")),
Exchange = new Exchange("paramore.brighter.exchange")
};
var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnection);
services.AddServiceActivator(options =>
{
options.ChannelFactory = new ChannelFactory(rmqMessageConsumerFactory);
})
}
...
Under the hood your Service Activator uses a Command Processor and you will need to configure lifetimes as discussed above.
An additional requirement is configuring the lifetime of the Command Processor itself. Within the context of an ASP.NET application, configuring the lifetime of the Command Processor relies on ASP.NET creating an instance of the Command Processor in a request pipeline. When you are using Service Activator there is no ASP.NET pipeline, instead Brighter's Dispatcher manages the lifetime of the Command Processor that we pass a request to. By setting the ServiceActivatorOptions.UseScoped field to true, you instruct Brighter to use a new Command Processor instance for each request. This is important if you take the Command Processor as a dependency in any of your Request Handlers with a Scoped lifetime. If in doubt, just set ServiceActivatorOptions.UseScoped field to true.
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
services.AddServiceActivator(options =>
{
options.UseScoped = true;
options.HandlerLifetime = ServiceLifetime.Scoped;
options.MapperLifetime = ServiceLifetime.Singleton;
options.CommandProcessorLifetime = ServiceLifetime.Scoped;
})
}
...
The call to AddServiceActivator() returns an IBrighterBuilder fluent interface. This means that you can use any of the options described in Brighter Build Fluent Interfaces to configure the associated Command Processor such as scanning assemblies for Request Handlers and adding an External Bus and Outbox.
An option is intended for the context of a Service Activator is described below.
As described in the Outbox Pattern an Outbox offers Guaranteed, At Least Once delivery. It explicitly may result in you sending duplicate messages. In addition, MoM tends to offer "At Least Once" guarantees only, further creating the risk that you will receive a duplicate message.
If the request is not idempotent, you can use an Inbox to de-duplicate it. See Inbox Support for more.
Configuring an Inbox has two elements. The first is the type of Inbox, the second configuration for the Inbox behavior.
Brighter provides a number of Inbox implementations for common Dbs (and you can write your own for a Db that we do not support). For this discussion we will look at Brighter's support for working with MySQL. See the documentation for working with specific Inbox implementations.
For this we will need the Inbox packages for the MySQL Inbox.
- Paramore.Brighter.Inbox.MySql
For a given backing store the pattern should be Paramore.Brighter.Inbox.{DATABASE} where {DATABASE} is the name of the Db that you are using.
To configure our Inbox we then need to use the UseExternalInbox method call and pass in an instance of a class that implements IAmAnInbox, taken from our package, and an instance of InboxConfiguration that tells Brighter how we want to use the Inbox.
For Inbox Configuration you set the following properties:
- ActionOnExists: What do we do if the request has been handled? The default,OnceOnlyAction.Throw is to throw a OnceOnlyException. If you take no other action this will cause the message to be rejected and sent to a DLQ if one is configured (See Handler Failure). The alternative is OnceOnlyAction.Warn simply logs that the request is a duplicate, but takes no other action.
- OnceOnly: This defaults to true and will check for a duplicate and take the action indicated by ActionOnExists. If false the Inbox will record the request, but will take no further action. (This tends to be set to false if you are using the Inbox to record what requests caused current state only and not de-duplicate).
- Scope: This indicates the type of request (Command or Event) to store in the Inbox. By default this is set to InboxScope.All and captures everything but you can be explicit and just capture InboxScope.Commands or InboxScope.Events. (This tends to be set to InboxScope.Commands when only commands cause changes to state that are not idempotent).
- Context: Used to uniquely identify receipt of this request via this handler. If you are recording Events and have multiple handlers, then the first event handler to receive the message will block the others from doing so, unless you disambiguate the handler identity by supplying a context method.
A typical Inbox configuration for MySQL would be:
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
services.AddServiceActivator(options =>
{
...
options.InboxConfiguration = new InboxConfiguration(
inbox: new MySqlInbox(new RelationalDatabaseConfiguration(DbConnectionString()))
scope: InboxScope.Commands,
onceOnly: true,
actionOnExists: OnceOnlyAction.Throw
)})
;
}
...
Typically DbConnectionString would obtain the connection string for the Db from configuration.
To run Service Activator we add it as a Hosted Service.
We provide the class ServiceActivatorHostedService for this in the NuGet package:
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
The ServiceActivatorHostedService calls the Dispatcher.Receive method which starts message pumps for the configured Subscriptions.
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
...
services.AddHostedService<ServiceActivatorHostedService>();
}
...
On shutdown Brighter will allow the current Request Handler to complete, then end the message pump loop and exit. If you have long-running handlers it is possible that they will not complete in the default 5s for graceful shutdown of the MS Generic Host. In this case, you need to increase the timeout of the host shutdown.
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
services.Configure<HostOptions>(options =>
{
options.ShutdownTimeout = TimeSpan.FromSeconds(20);
});
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
...
services.AddHostedService<ServiceActivatorHostedService>();
}
When all of the relevant configuration sections are added together, your code will look something like this, with variations for your transport and stores.
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices(hostContext, services) =>
{
services.Configure<HostOptions>(options =>
{
options.ShutdownTimeout = TimeSpan.FromSeconds(20);
});
ConfigureBrighter(hostContext, services);
}
private static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection services)
{
var subscriptions = new Subscription[]
{
new RmqSubscription<GreetingMade>(
new SubscriptionName("paramore.sample.salutationanalytics"),
new ChannelName("SalutationAnalytics"),
new RoutingKey("GreetingMade"),
runAsync: true,
timeoutInMilliseconds: 200,
isDurable: true,
makeChannels: OnMissingChannel.Create), //change to OnMissingChannel.Validate if you have infrastructure declared elsewhere
};
var rmqConnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri($"amqp://guest:guest@localhost:5672")),
Exchange = new Exchange("paramore.brighter.exchange")
};
var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnection);
services.AddServiceActivator(options =>
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new ChannelFactory(rmqMessageConsumerFactory);
options.UseScoped = true;
options.HandlerLifetime = ServiceLifetime.Scoped;
options.MapperLifetime = ServiceLifetime.Singleton;
options.CommandProcessorLifetime = ServiceLifetime.Scoped;
options.PolicyRegistry = new SalutationPolicy();
options.InboxConfiguration = new InboxConfiguration(
inbox: new MySqlInbox(new RelationalDatabaseConfiguration(DbConnectionString()))
scope: InboxScope.Commands,
onceOnly: true,
actionOnExists: OnceOnlyAction.Throw
);
})
.UseExternalBus((configure) =>
{
configure.ProducerRegistry = producerRegistry;
configure.Outbox = outbox;
configure.TransactionProvider = transactionProvider;
configure.ConnectionProvider = connectionProvider;
})
.AutoFromAssemblies();
services.AddHostedService<ServiceActivatorHostedService>();
}
Brighter includes a comprehensive set of Samples in its main repo that you can review for clarity on how Brighter works and should be configured.