diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md
index 37b10398b..172788424 100644
--- a/docs/content/user-guide/en/cap/configuration.md
+++ b/docs/content/user-guide/en/cap/configuration.md
@@ -144,16 +144,33 @@ The expiration time (in seconds) of the failed message. When the message is sent
If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.
-#### EnableConsumerPrefetch
+#### [Obsolete] EnableConsumerPrefetch
> Default: false, Before version 7.0 the default behavior is true
-By default, CAP will only read one message from the message queue, then execute the subscription method. After the execution is done, it will read the next message for execution.
-If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution.
+Renamed to `EnableSubscriberParallelExecute` option, Please use the new option.
+
+### EnableSubscriberParallelExecute
+
+> Default: false
+
+If `true`, CAP will prefetch some message from the broker as buffered, then execute the subscriber method. After the execution is done, it will fetch the next batch for execution.
!!! note "Precautions"
Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again
+### SubscriberParallelExecuteThreadCount
+
+> Default: `Environment.ProcessorCount`
+
+With the `EnableSubscriberParallelExecute` option enabled, specify the number of parallel task execution threads.
+
+### SubscriberParallelExecuteBufferFactor
+
+> Default: 1
+
+With the `EnableSubscriberParallelExecute` option enabled, multiplier used to determine the buffered capacity size in subscriber parallel execution. The buffer capacity is computed by multiplying this factor with the value of `SubscriberParallelExecuteThreadCount`, which represents the number of threads allocated for parallel processing.
+
#### EnablePublishParallelSend
> Default: false, The (7.2 <= Version < 8.1) the default behavior is true
diff --git a/docs/content/user-guide/zh/cap/configuration.md b/docs/content/user-guide/zh/cap/configuration.md
index e51a58f0d..bad3f7858 100644
--- a/docs/content/user-guide/zh/cap/configuration.md
+++ b/docs/content/user-guide/zh/cap/configuration.md
@@ -148,16 +148,33 @@ services.AddCap(config =>
在同时配合使用 `EnableConsumerPrefetch` 时,请参考 issue [#1399](https://github.com/dotnetcore/CAP/issues/1399) 以清晰其预期行为。
-#### EnableConsumerPrefetch
+#### [已过时] EnableConsumerPrefetch
> 默认值: false, 在 7.0 版本之前默认行为 true
-默认情况下,CAP只会从消息队列读取一条,然后执行订阅方法,执行完成后才会读取下一条来执行.
-如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行。
+ 该配置项已被重命名为 `EnableSubscriberParallelExecute`,请使用新选项。
+
+### EnableSubscriberParallelExecute
+
+> 默认值: false
+
+如果设置为 `true`,CAP将提前从Broker拉取一批消息置于内存缓冲区,然后执行订阅方法;当订阅方法执行完成后,拉取下一批消息至于缓冲区然后执行。
!!! note "注意事项"
设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行
+### SubscriberParallelExecuteThreadCount
+
+> Default: `Environment.ProcessorCount`
+
+当启用 `EnableSubscriberParallelExecute` 时, 可通过此参数执行并行处理的线程数,默认值为处理器个数。
+
+### SubscriberParallelExecuteBufferFactor
+
+> Default: 1
+
+当启用 `EnableSubscriberParallelExecute` 时, 通过此参数设置缓冲区和线程数的因子系数,也就是缓冲区大小等于 `SubscriberParallelExecuteThreadCount` 乘 `SubscriberParallelExecuteBufferFactor`.
+
#### EnablePublishParallelSend
> 默认值: false
diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs
index 32fbb2a03..4d2b52005 100644
--- a/src/DotNetCore.CAP/CAP.Options.cs
+++ b/src/DotNetCore.CAP/CAP.Options.cs
@@ -12,7 +12,10 @@
namespace DotNetCore.CAP;
///
-/// Represents all the options you can use to configure the system.
+/// Provides options to customize various aspects of the message processing pipeline. This includes settings for message expiration,
+/// retry mechanisms, concurrency management, and serialization, among others. This class allows fine-tuning
+/// CAP's behavior to better align with specific application requirements, such as adjusting threading models for
+/// subscriber message processing, setting message expiry times, and customizing serialization settings.
///
public class CapOptions
{
@@ -24,7 +27,9 @@ public CapOptions()
FailedRetryCount = 50;
ConsumerThreadCount = 1;
EnablePublishParallelSend = false;
- EnableConsumerPrefetch = false;
+ EnableSubscriberParallelExecute = false;
+ SubscriberParallelExecuteThreadCount = Environment.ProcessorCount;
+ SubscriberParallelExecuteBufferFactor = 1;
Extensions = new List();
Version = "v1";
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower();
@@ -92,20 +97,41 @@ public CapOptions()
public int ConsumerThreadCount { get; set; }
///
- /// If true, the message will be prefetch to memory queue for parallel execute by .net thread pool.
- /// Default is false
+ /// If true, the message will be buffered to memory queue for parallel execute.
+ /// The option is obsolete, use EnableSubscriberParallelExecute instead!
///
- public bool EnableConsumerPrefetch { get; set; }
+ [Obsolete("Renamed to EnableSubscriberParallelExecute option.")]
+ public bool EnableConsumerPrefetch { get => EnableSubscriberParallelExecute; set => EnableSubscriberParallelExecute = value; }
+
+ ///
+ /// If true, the message will be buffered to memory queue for parallel execute;
+ /// Default is false.
+ /// Use to specify the number of parallel threads.
+ ///
+ public bool EnableSubscriberParallelExecute { get; set; }
+
+ ///
+ /// With the EnableSubscriberParallelExecute option enabled, specify the number of parallel task execution threads.
+ /// Default is .
+ ///
+ public int SubscriberParallelExecuteThreadCount { get; set; }
+
+ ///
+ /// With the EnableSubscriberParallelExecute option enabled, multiplier used to determine the buffered capacity size in subscriber parallel execution.
+ /// The buffer capacity is computed by multiplying this factor with the value of SubscriberParallelExecuteThreadCount,
+ /// which represents the number of threads allocated for parallel processing.
+ ///
+ public int SubscriberParallelExecuteBufferFactor { get; set; }
///
/// If true, the message send task will be parallel execute by .net thread pool.
- /// Default is false
+ /// Default is false.
///
public bool EnablePublishParallelSend { get; set; }
///
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
- /// value is.
+ /// value is.
/// Default is false.
///
public bool UseDispatchingPerGroup { get; set; }
diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
index 771ce9b8f..351360567 100644
--- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
+++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
@@ -26,7 +26,7 @@ public class Dispatcher : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue _schedulerQueue;
- private readonly bool _enablePrefetch;
+ private readonly bool _enableParallelExecute;
private readonly bool _enableParallelSend;
private Channel _publishedChannel = default!;
@@ -45,7 +45,7 @@ public Dispatcher(ILogger logger,
_executor = executor;
_schedulerQueue = new PriorityQueue();
_storage = storage;
- _enablePrefetch = options.Value.EnableConsumerPrefetch;
+ _enableParallelExecute = options.Value.EnableSubscriberParallelExecute;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}
@@ -66,20 +66,20 @@ public async Task Start(CancellationToken stoppingToken)
await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return valuetask
- if (_enablePrefetch)
+ if (_enableParallelExecute)
{
- var capacity = _options.ConsumerThreadCount * 300;
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
- new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
+ new BoundedChannelOptions(_options.SubscriberParallelExecuteThreadCount * _options.SubscriberParallelExecuteBufferFactor)
{
AllowSynchronousContinuations = true,
- SingleReader = _options.ConsumerThreadCount == 1,
+ SingleReader = _options.SubscriberParallelExecuteThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
- await Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
- .Select(_ => Task.Run(Processing, _tasksCts.Token)).ToArray()).ConfigureAwait(false);
+ await Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadCount)
+ .Select(_ => Task.Run(Processing, _tasksCts.Token)).ToArray())
+ .ConfigureAwait(false);
}
_ = Task.Run(async () =>
{
@@ -172,7 +172,7 @@ public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorD
{
if (_tasksCts!.IsCancellationRequested) return;
- if (_enablePrefetch)
+ if (_enableParallelExecute)
{
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
{
@@ -246,7 +246,7 @@ private async ValueTask Processing()
{
var item1 = message.Item1;
var item2 = message.Item2;
- _ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
+ await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -254,7 +254,8 @@ private async ValueTask Processing()
}
catch (Exception e)
{
- _logger.LogError(e, "An exception occurred when invoking subscriber. MessageId:{MessageId}", message.Item1.DbId);
+ _logger.LogError(e,
+ $"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}");
}
}
catch (OperationCanceledException)
diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
index 05be91aa7..e09135537 100644
--- a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
+++ b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
@@ -27,7 +27,7 @@ internal class DispatcherPerGroup : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue _schedulerQueue;
- private readonly bool _enablePrefetch;
+ private readonly bool _enableParallelExecute;
private readonly bool _enableParallelSend;
private Channel _publishedChannel = default!;
@@ -47,7 +47,7 @@ public DispatcherPerGroup(ILogger logger,
_executor = executor;
_schedulerQueue = new PriorityQueue();
_storage = storage;
- _enablePrefetch = options.Value.EnableConsumerPrefetch;
+ _enableParallelExecute = options.Value.EnableSubscriberParallelExecute;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}
@@ -70,7 +70,7 @@ public async Task Start(CancellationToken stoppingToken)
_receivedChannels =
new ConcurrentDictionary>(
- _options.ConsumerThreadCount, _options.ConsumerThreadCount * 2);
+ _options.SubscriberParallelExecuteThreadCount, _options.SubscriberParallelExecuteThreadCount * 2);
GetOrCreateReceiverChannel(_options.DefaultGroupName);
@@ -191,19 +191,25 @@ public void Dispose()
"Creating receiver channel for group {ConsumerGroup} with thread count {ConsumerThreadCount}", group,
_options.ConsumerThreadCount);
- var capacity = _options.ConsumerThreadCount * 300;
var channel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
- new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
+ new BoundedChannelOptions(_options.SubscriberParallelExecuteThreadCount * _options.SubscriberParallelExecuteBufferFactor)
{
AllowSynchronousContinuations = true,
- SingleReader = _options.ConsumerThreadCount == 1,
+ SingleReader = _options.SubscriberParallelExecuteThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
- Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
- .Select(_ => Task.Run(() => Processing(group, channel), _tasksCts!.Token)).ToArray());
-
+ if (_enableParallelExecute)
+ {
+ Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadCount)
+ .Select(_ => Task.Run(() => Processing(group, channel), _tasksCts!.Token)).ToArray())
+ .ConfigureAwait(false);
+ }
+ else
+ {
+ _ = Task.Run(() => Processing(group, channel), _tasksCts!.Token).ConfigureAwait(false);
+ }
return channel;
});
}
@@ -253,15 +259,7 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume
var item1 = message.Item1;
var item2 = message.Item2;
-
- if (_enablePrefetch)
- {
- _ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
- }
- else
- {
- await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
- }
+ await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{