diff --git a/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs b/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs
new file mode 100644
index 0000000..bd2fcd4
--- /dev/null
+++ b/src/Orleans/AmqpProtocol/Configuration/ConfigureQueueOptions.cs
@@ -0,0 +1,33 @@
+// Copyright (c) Escendit Ltd. All Rights Reserved.
+// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information.
+
+namespace Escendit.Orleans.Streaming.RabbitMQ.AmqpProtocol.Configuration;
+
+using global::Orleans.Streams;
+using Handlers;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+///
+internal class ConfigureQueueOptions : IConfigureOptions
+{
+ private readonly ILoggerFactory _loggerFactory;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The logger.
+ public ConfigureQueueOptions(ILoggerFactory loggerFactory)
+ {
+ _loggerFactory = loggerFactory;
+ }
+
+ ///
+ public void Configure(QueueOptions options)
+ {
+ ArgumentNullException.ThrowIfNull(options);
+ options.StreamFailureHandler = (queueId) =>
+ Task.FromResult(
+ new RabbitMqStreamDeliveryFailureHandler(_loggerFactory.CreateLogger(), queueId, options.ShouldFaultSubscriptionOnError));
+ }
+}
diff --git a/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs b/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs
new file mode 100644
index 0000000..862f145
--- /dev/null
+++ b/src/Orleans/AmqpProtocol/Handlers/RabbitMqStreamDeliveryFailureHandler.cs
@@ -0,0 +1,69 @@
+// Copyright (c) Escendit Ltd. All Rights Reserved.
+// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information.
+
+namespace Escendit.Orleans.Streaming.RabbitMQ.AmqpProtocol.Handlers;
+
+using global::Orleans.Runtime;
+using global::Orleans.Streams;
+using Microsoft.Extensions.Logging;
+
+///
+/// RabbitMQ Stream Failure Handler.
+///
+internal partial class RabbitMqStreamDeliveryFailureHandler : IStreamFailureHandler
+{
+ private readonly ILogger _logger;
+ private readonly QueueId _queueId;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The logger.
+ /// The queue id.
+ /// The value if it should fault on subscription error.
+ public RabbitMqStreamDeliveryFailureHandler(ILogger logger, QueueId queueId, bool shouldFaultSubscriptionOnError = false)
+ {
+ _logger = logger;
+ _queueId = queueId;
+ ShouldFaultSubsriptionOnError = shouldFaultSubscriptionOnError;
+ }
+
+ ///
+ public bool ShouldFaultSubsriptionOnError { get; }
+
+ ///
+ public Task OnDeliveryFailure(
+ GuidId subscriptionId,
+ string streamProviderName,
+ StreamId streamIdentity,
+ StreamSequenceToken sequenceToken)
+ {
+ LogDeliveryFailure(_queueId, subscriptionId.Guid, streamProviderName, streamIdentity, sequenceToken, ShouldFaultSubsriptionOnError);
+ return Task.CompletedTask;
+ }
+
+ ///
+ public Task OnSubscriptionFailure(
+ GuidId subscriptionId,
+ string streamProviderName,
+ StreamId streamIdentity,
+ StreamSequenceToken sequenceToken)
+ {
+ LogSubscriptionFailure(_queueId, subscriptionId.Guid, streamProviderName, streamIdentity, sequenceToken, ShouldFaultSubsriptionOnError);
+ return Task.CompletedTask;
+ }
+
+ [LoggerMessage(
+ EventId = 1000,
+ EventName = "Delivery Failure",
+ Level = LogLevel.Warning,
+ Message = "Delivery Failure with QueueId: {queueId}, Subscription: {subscriptionId}, Stream Provider: {streamProviderName}, Stream Id: {streamId}, Sequence Token: {streamSequenceToken} with {fault}")]
+ private partial void LogDeliveryFailure(QueueId queueId, Guid subscriptionId, string streamProviderName, StreamId streamId, StreamSequenceToken streamSequenceToken, bool fault);
+
+ [LoggerMessage(
+ EventId = 1010,
+ EventName = "Subscription Failure",
+ Level = LogLevel.Warning,
+ Message = "Subscription Failure with QueueId: {queueId}, Subscription: {subscriptionId}, Stream Provider: {streamProviderName}, Stream Id: {streamId}, Sequence Token: {streamSequenceToken} with {fault}")]
+ private partial void LogSubscriptionFailure(QueueId queueId, Guid subscriptionId, string streamProviderName, StreamId streamId, StreamSequenceToken streamSequenceToken, bool fault);
+}
diff --git a/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs b/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs
index c9bb4dd..70e001f 100644
--- a/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs
+++ b/src/Orleans/AmqpProtocol/Hosting/ClientBuilderExtensions.cs
@@ -53,4 +53,18 @@ public static IRabbitMqClientOptionsBuilder ConfigureStreamPubSub(this IRabbitMq
clientBuilder.Configurator.ConfigureStreamPubSub(streamPubSubType);
return clientBuilder;
}
+
+ ///
+ /// Configure Default Stream Failure Handler.
+ ///
+ /// The initial client options builder.
+ /// The updated client options builder.
+ public static IRabbitMqClientOptionsBuilder ConfigureDefaultStreamDeliveryFailureHandler(this IRabbitMqClientOptionsBuilder clientBuilder)
+ {
+ clientBuilder.ConfigureServices(services =>
+ {
+ services.ConfigureOptions();
+ });
+ return clientBuilder;
+ }
}
diff --git a/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs b/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs
index 1ecc4c8..ec50746 100644
--- a/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs
+++ b/src/Orleans/AmqpProtocol/Hosting/SiloBuilderExtensions.cs
@@ -54,4 +54,18 @@ public static IRabbitMqSiloOptionsBuilder ConfigureStreamPubSub(this IRabbitMqSi
siloBuilder.Configurator.ConfigureStreamPubSub(streamPubSubType);
return siloBuilder;
}
+
+ ///
+ /// Configure Default Stream Failure Handler.
+ ///
+ /// The initial silo options builder.
+ /// The updated silo options builder.
+ public static IRabbitMqSiloOptionsBuilder ConfigureDefaultStreamDeliveryFailureHandler(this IRabbitMqSiloOptionsBuilder siloOptionsBuilder)
+ {
+ siloOptionsBuilder.ConfigureServices(services =>
+ {
+ services.ConfigureOptions();
+ });
+ return siloOptionsBuilder;
+ }
}
diff --git a/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs b/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs
index f119956..cc8d2d0 100644
--- a/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs
+++ b/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs
@@ -9,13 +9,29 @@ namespace Escendit.Orleans.Streaming.RabbitMQ.Configuration;
///
/// Options Base.
///
-public abstract class OptionsBase
+public class OptionsBase
{
///
- /// Gets the stream failure handler.
+ /// Initializes a new instance of the class.
+ ///
+ protected OptionsBase()
+ {
+ }
+
+ ///
+ /// Gets or sets the stream failure handler.
///
/// The stream failure handler.
[Browsable(false)]
- public Func> StreamFailureHandler { get; internal set; } = _ =>
+ [EditorBrowsable(EditorBrowsableState.Advanced)]
+ public Func> StreamFailureHandler { get; set; } = _ =>
Task.FromResult(new NoOpStreamDeliveryFailureHandler());
+
+ ///
+ /// Gets or sets a value indicating whether it should fault on subscription error.
+ ///
+ /// The flag if subscription should fault.
+ [Browsable(false)]
+ [EditorBrowsable(EditorBrowsableState.Advanced)]
+ public bool ShouldFaultSubscriptionOnError { get; set; }
}