Skip to content

Commit

Permalink
#63 fixing semaphore bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Repanich committed Nov 4, 2018
1 parent 6b0717f commit 20441e3
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/Quidjibo/Servers/QuidjiboServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,34 @@ private async Task WorkLoopAsync(string queue)
var workProvider = await _workProviderFactory.CreateAsync(queue, _cts.Token);
while (!_cts.IsCancellationRequested)
{
var items = new List<WorkItem>(0);
try
{
_logger.LogDebug("Throttle Count : {ThrottleCount}", _throttle.CurrentCount);
_logger.LogTrace("Throttle Count : {ThrottleCount}", _throttle.CurrentCount);

// throttle is important when there is more than one listener
await _throttle.WaitAsync(_cts.Token);
var items = await workProvider.ReceiveAsync(Worker, _cts.Token);
if (items.Any())
items = await workProvider.ReceiveAsync(Worker, _cts.Token);
if (items.Count > 0)
{
_logger.LogDebug("Received {WorkItemCount} items.", items.Count);
var tasks = items.Select(item => InvokePipelineAsync(workProvider, item));
await Task.WhenAll(tasks);
_throttle.Release();
continue;
}

_throttle.Release();
await Task.Delay(pollingInterval, _cts.Token);
}
catch (Exception exception)
{
_logger.LogWarning(0, exception, exception.Message);
}
finally
{
_throttle.Release();
}

if (items.Count <= 0)
{
await Task.Delay(pollingInterval, _cts.Token);
}
}
}

Expand Down

0 comments on commit 20441e3

Please sign in to comment.