diff --git a/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs b/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs new file mode 100644 index 0000000..bd2fcd4 --- /dev/null +++ b/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs @@ -0,0 +1,33 @@ +// Copyright (c) Escendit Ltd. All Rights Reserved. +// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information. + +namespace Escendit.Orleans.Streaming.RabbitMQ.AmqpProtocol.Configuration; + +using global::Orleans.Streams; +using Handlers; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +/// +internal class ConfigureQueueOptions : IConfigureOptions +{ + private readonly ILoggerFactory _loggerFactory; + + /// + /// Initializes a new instance of the class. + /// + /// The logger. + public ConfigureQueueOptions(ILoggerFactory loggerFactory) + { + _loggerFactory = loggerFactory; + } + + /// + public void Configure(QueueOptions options) + { + ArgumentNullException.ThrowIfNull(options); + options.StreamFailureHandler = (queueId) => + Task.FromResult( + new RabbitMqStreamDeliveryFailureHandler(_loggerFactory.CreateLogger(), queueId, options.ShouldFaultSubscriptionOnError)); + } +} diff --git a/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs b/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs new file mode 100644 index 0000000..862f145 --- /dev/null +++ b/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs @@ -0,0 +1,69 @@ +// Copyright (c) Escendit Ltd. All Rights Reserved. +// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information. + +namespace Escendit.Orleans.Streaming.RabbitMQ.AmqpProtocol.Handlers; + +using global::Orleans.Runtime; +using global::Orleans.Streams; +using Microsoft.Extensions.Logging; + +/// +/// RabbitMQ Stream Failure Handler. +/// +internal partial class RabbitMqStreamDeliveryFailureHandler : IStreamFailureHandler +{ + private readonly ILogger _logger; + private readonly QueueId _queueId; + + /// + /// Initializes a new instance of the class. + /// + /// The logger. + /// The queue id. + /// The value if it should fault on subscription error. + public RabbitMqStreamDeliveryFailureHandler(ILogger logger, QueueId queueId, bool shouldFaultSubscriptionOnError = false) + { + _logger = logger; + _queueId = queueId; + ShouldFaultSubsriptionOnError = shouldFaultSubscriptionOnError; + } + + /// + public bool ShouldFaultSubsriptionOnError { get; } + + /// + public Task OnDeliveryFailure( + GuidId subscriptionId, + string streamProviderName, + StreamId streamIdentity, + StreamSequenceToken sequenceToken) + { + LogDeliveryFailure(_queueId, subscriptionId.Guid, streamProviderName, streamIdentity, sequenceToken, ShouldFaultSubsriptionOnError); + return Task.CompletedTask; + } + + /// + public Task OnSubscriptionFailure( + GuidId subscriptionId, + string streamProviderName, + StreamId streamIdentity, + StreamSequenceToken sequenceToken) + { + LogSubscriptionFailure(_queueId, subscriptionId.Guid, streamProviderName, streamIdentity, sequenceToken, ShouldFaultSubsriptionOnError); + return Task.CompletedTask; + } + + [LoggerMessage( + EventId = 1000, + EventName = "Delivery Failure", + Level = LogLevel.Warning, + Message = "Delivery Failure with QueueId: {queueId}, Subscription: {subscriptionId}, Stream Provider: {streamProviderName}, Stream Id: {streamId}, Sequence Token: {streamSequenceToken} with {fault}")] + private partial void LogDeliveryFailure(QueueId queueId, Guid subscriptionId, string streamProviderName, StreamId streamId, StreamSequenceToken streamSequenceToken, bool fault); + + [LoggerMessage( + EventId = 1010, + EventName = "Subscription Failure", + Level = LogLevel.Warning, + Message = "Subscription Failure with QueueId: {queueId}, Subscription: {subscriptionId}, Stream Provider: {streamProviderName}, Stream Id: {streamId}, Sequence Token: {streamSequenceToken} with {fault}")] + private partial void LogSubscriptionFailure(QueueId queueId, Guid subscriptionId, string streamProviderName, StreamId streamId, StreamSequenceToken streamSequenceToken, bool fault); +} diff --git a/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs b/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs index c9bb4dd..70e001f 100644 --- a/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs +++ b/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs @@ -53,4 +53,18 @@ public static IRabbitMqClientOptionsBuilder ConfigureStreamPubSub(this IRabbitMq clientBuilder.Configurator.ConfigureStreamPubSub(streamPubSubType); return clientBuilder; } + + /// + /// Configure Default Stream Failure Handler. + /// + /// The initial client options builder. + /// The updated client options builder. + public static IRabbitMqClientOptionsBuilder ConfigureDefaultStreamDeliveryFailureHandler(this IRabbitMqClientOptionsBuilder clientBuilder) + { + clientBuilder.ConfigureServices(services => + { + services.ConfigureOptions(); + }); + return clientBuilder; + } } diff --git a/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs b/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs index 1ecc4c8..ec50746 100644 --- a/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs +++ b/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs @@ -54,4 +54,18 @@ public static IRabbitMqSiloOptionsBuilder ConfigureStreamPubSub(this IRabbitMqSi siloBuilder.Configurator.ConfigureStreamPubSub(streamPubSubType); return siloBuilder; } + + /// + /// Configure Default Stream Failure Handler. + /// + /// The initial silo options builder. + /// The updated silo options builder. + public static IRabbitMqSiloOptionsBuilder ConfigureDefaultStreamDeliveryFailureHandler(this IRabbitMqSiloOptionsBuilder siloOptionsBuilder) + { + siloOptionsBuilder.ConfigureServices(services => + { + services.ConfigureOptions(); + }); + return siloOptionsBuilder; + } } diff --git a/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs b/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs index f119956..cc8d2d0 100644 --- a/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs +++ b/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs @@ -9,13 +9,29 @@ namespace Escendit.Orleans.Streaming.RabbitMQ.Configuration; /// /// Options Base. /// -public abstract class OptionsBase +public class OptionsBase { /// - /// Gets the stream failure handler. + /// Initializes a new instance of the class. + /// + protected OptionsBase() + { + } + + /// + /// Gets or sets the stream failure handler. /// /// The stream failure handler. [Browsable(false)] - public Func> StreamFailureHandler { get; internal set; } = _ => + [EditorBrowsable(EditorBrowsableState.Advanced)] + public Func> StreamFailureHandler { get; set; } = _ => Task.FromResult(new NoOpStreamDeliveryFailureHandler()); + + /// + /// Gets or sets a value indicating whether it should fault on subscription error. + /// + /// The flag if subscription should fault. + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Advanced)] + public bool ShouldFaultSubscriptionOnError { get; set; } }