Skip to content

Commit

Permalink
Limit NewItemInQueueEvents keys to a specific storage
Browse files Browse the repository at this point in the history
  • Loading branch information
odinserj committed Jun 11, 2024
1 parent 42e608b commit 3ff91d8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions src/Hangfire.SqlServer/SqlServerJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal class SqlServerJobQueue : IPersistentJobQueue
// This is an optimization that helps to overcome the polling delay, when
// both client and server reside in the same process. Everything is working
// without these events, but it helps to reduce the delays in processing.
internal static readonly ConcurrentDictionary<string, AutoResetEvent> NewItemInQueueEvents = new();
internal static readonly ConcurrentDictionary<Tuple<SqlServerStorage, string>, AutoResetEvent> NewItemInQueueEvents = new();

private static readonly Func<Tuple<SqlServerStorage, string>, SemaphoreSlim> CreateSemaphoreFunc = CreateSemaphore;
private static readonly TimeSpan LongPollingThreshold = TimeSpan.FromSeconds(1);
Expand Down Expand Up @@ -123,7 +123,7 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu
var queuesString = String.Join("_", queues.OrderBy(static x => x));
var resource = Tuple.Create(_storage, queuesString);

var waitArray = GetWaitArrayForQueueSignals(queues, cancellationToken);
var waitArray = GetWaitArrayForQueueSignals(_storage, queues, cancellationToken);

SemaphoreSlim semaphore = null;

Expand Down Expand Up @@ -210,7 +210,7 @@ private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, Cancell
? _options.QueuePollInterval
: TimeSpan.FromSeconds(1);

var waitArray = GetWaitArrayForQueueSignals(queues, cancellationToken);
var waitArray = GetWaitArrayForQueueSignals(_storage, queues, cancellationToken);

while (!cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -271,7 +271,7 @@ private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, Cancell
return null;
}

private static WaitHandle[] GetWaitArrayForQueueSignals(string[] queues, CancellationToken cancellationToken)
private static WaitHandle[] GetWaitArrayForQueueSignals(SqlServerStorage storage, string[] queues, CancellationToken cancellationToken)
{
var waitList = new List<WaitHandle>(capacity: queues.Length + 1)
{
Expand All @@ -280,7 +280,7 @@ private static WaitHandle[] GetWaitArrayForQueueSignals(string[] queues, Cancell

foreach (var queue in queues)
{
waitList.Add(NewItemInQueueEvents.GetOrAdd(queue, static _ => new AutoResetEvent(initialState: false)));
waitList.Add(NewItemInQueueEvents.GetOrAdd(Tuple.Create(storage, queue), static _ => new AutoResetEvent(initialState: false)));
}

return waitList.ToArray();
Expand Down
2 changes: 1 addition & 1 deletion src/Hangfire.SqlServer/SqlServerWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ private void TrySignalListeningWorkers()
{
foreach (var queue in _queuesToSignal)
{
if (SqlServerJobQueue.NewItemInQueueEvents.TryGetValue(queue, out var signal))
if (SqlServerJobQueue.NewItemInQueueEvents.TryGetValue(Tuple.Create(_storage, queue), out var signal))
{
signal.Set();
}
Expand Down

0 comments on commit 3ff91d8

Please sign in to comment.