Skip to content

Commit

Permalink
The options EnableConsumerPrefetch and UseDispatchingPerGroup will wo…
Browse files Browse the repository at this point in the history
…rk together without interference. (#1399)
  • Loading branch information
yang-xiaodong committed Sep 8, 2023
1 parent ff76dea commit 7f84af5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
3 changes: 0 additions & 3 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,15 @@ public CapOptions()

/// <summary>
/// If true, the message will be pre fetch to memory queue for parallel execute by thread pool.
/// <para>Not available when <see cref="UseDispatchingPerGroup"/> true.</para>
/// Default is false
/// </summary>
public bool EnableConsumerPrefetch { get; set; }

/// <summary>
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
/// <see cref="ConsumerThreadCount" /> value is.
/// <para>If true, the <see cref="EnableConsumerPrefetch"/> is not available.</para>
/// Default is false.
/// </summary>
[Obsolete("Use EnableConsumerPrefetch instead. Setting it to true means that each consumer is now executed concurrently by thread pool, regardless of whether they are in different groups.")]
public bool UseDispatchingPerGroup { get; set; }

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void Execute()
ICollection<string> topics;
try
{
// ReSharper disable once ConvertToUsingDeclaration
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
client.OnLogCallback = WriteLog;
Expand All @@ -139,6 +140,7 @@ public void Execute()
{
try
{
// ReSharper disable once ConvertToUsingDeclaration
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
_serverAddress = client.BrokerAddress;
Expand Down
12 changes: 11 additions & 1 deletion src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal class DispatcherPerGroup : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, DateTime> _schedulerQueue;
private readonly bool _enablePrefetch;

private Channel<MediumMessage> _publishedChannel = default!;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>> _receivedChannels = default!;
Expand All @@ -45,6 +46,7 @@ public DispatcherPerGroup(ILogger<Dispatcher> logger,
_executor = executor;
_schedulerQueue = new PriorityQueue<MediumMessage, DateTime>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
}

public async Task Start(CancellationToken stoppingToken)
Expand Down Expand Up @@ -242,7 +244,15 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume

var item1 = message.Item1;
var item2 = message.Item2;
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));

if (_enablePrefetch)
{
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
}
else
{
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
Expand Down

0 comments on commit 7f84af5

Please sign in to comment.