Skip to content

Commit

Permalink
Merge b0b661a into 47afea6
Browse files Browse the repository at this point in the history
  • Loading branch information
dlidstrom authored Feb 28, 2022
2 parents 47afea6 + b0b661a commit 4949008
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 67 deletions.
9 changes: 8 additions & 1 deletion Snittlistan.Queue.WindowsServiceHost/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
<!-- errorQueue attribute is optional and will default to {readQueue}.error -->
<!-- workerThreads is optional (defaults to 1), and limited to #cpus * 2 -->
<!-- autoCreateQueues is optional: defaults to true for convenience; false for setting up manually (avoid if possible) -->
<add name="TaskQueue" isEnabled="true" readQueue=".\Private$\taskqueue" workerThreads="1" autoCreateQueues="true" />
<!-- dropFailedMessages: use true to drop failed messages; false to move to error queue -->
<add
name="TaskQueue"
isEnabled="true"
readQueue=".\Private$\taskqueue"
workerThreads="1"
autoCreateQueues="true"
dropFailedMessages="true" />
</queueListeners>
</messaging>
<startup>
Expand Down
4 changes: 3 additions & 1 deletion Snittlistan.Queue/Config/MessagingConfigSection.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

#nullable enable

using System.Configuration;

namespace Snittlistan.Queue.Config;

public class MessagingConfigSection : ConfigurationSection
{
[ConfigurationProperty("queueListeners", IsRequired = true)]
Expand Down
19 changes: 14 additions & 5 deletions Snittlistan.Queue/Config/QueueListenerElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,22 @@ public bool AutoCreateQueues
set => this["autoCreateQueues"] = value;
}

[ConfigurationProperty("dropFailedMessages", DefaultValue = false)]
public bool DropFailedMessages
{
get => (bool)this["dropFailedMessages"];

set => this["dropFailedMessages"] = value;
}

public MessageQueueProcessorSettings CreateSettings()
{
string readQueue = ReadQueue;
string errorQueue = ErrorQueue;
int workerThreads = WorkerThreads;
bool autoCreateQueues = AutoCreateQueues;
MessageQueueProcessorSettings settings = new(readQueue, errorQueue, workerThreads, autoCreateQueues);
MessageQueueProcessorSettings settings = new(
ReadQueue,
ErrorQueue,
WorkerThreads,
AutoCreateQueues,
DropFailedMessages);
return settings;
}
}
54 changes: 24 additions & 30 deletions Snittlistan.Queue/MessageQueueListenerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,14 @@ namespace Snittlistan.Queue;
public abstract class MessageQueueListenerBase : IDisposable
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private readonly string _importQueue;
private readonly MessageQueue[] _importMessageQueues;
private readonly MessageQueue _errorMessageQueue;
private readonly Counter _counter = new();
private volatile bool _isClosing;
private readonly MessageQueue[] importMessageQueues;
private readonly MessageQueue errorMessageQueue;
private readonly Counter counter = new();
private readonly MessageQueueProcessorSettings settings;
private volatile bool isClosing;

protected MessageQueueListenerBase(MessageQueueProcessorSettings settings)
{
if (settings == null)
{
throw new ArgumentNullException(nameof(settings));
}

_importQueue = settings.ReadQueue;

// create import queue automatically, if it doesn't exist
if (settings.AutoCreateQueues)
{
Expand All @@ -41,7 +34,7 @@ protected MessageQueueListenerBase(MessageQueueProcessorSettings settings)
"Using {numberOfThreads} worker threads for {readQueue}",
numberOfThreads,
settings.ReadQueue);
_importMessageQueues = Enumerable.Range(0, numberOfThreads).Select(
importMessageQueues = Enumerable.Range(0, numberOfThreads).Select(
x =>
{
MessageQueue queue = new(settings.ReadQueue, QueueAccessMode.SendAndReceive);
Expand All @@ -50,18 +43,19 @@ protected MessageQueueListenerBase(MessageQueueProcessorSettings settings)
}).ToArray();

// create error queue automatically, if it doesn't exist
if (settings.AutoCreateQueues)
if (settings.AutoCreateQueues && settings.DropFailedMessages == false)
{
CreateQueue(settings.ErrorQueue);
}

_errorMessageQueue = new MessageQueue(settings.ErrorQueue, QueueAccessMode.Send);
errorMessageQueue = new MessageQueue(settings.ErrorQueue, QueueAccessMode.Send);
this.settings = settings;
}

public void Start()
{
Logger.Info("Starting msmq listener");
foreach (MessageQueue importMessageQueue in _importMessageQueues)
foreach (MessageQueue importMessageQueue in importMessageQueues)
{
importMessageQueue.PeekCompleted += Queue_PeekCompleted;
_ = importMessageQueue.BeginPeek();
Expand All @@ -70,43 +64,43 @@ public void Start()

public void Stop()
{
_isClosing = true;
isClosing = true;
Logger.Info("Stopping msmq listener");
foreach (MessageQueue importMessageQueue in _importMessageQueues)
foreach (MessageQueue importMessageQueue in importMessageQueues)
{
importMessageQueue.PeekCompleted -= Queue_PeekCompleted;
importMessageQueue.Close();
}

_errorMessageQueue.Close();
errorMessageQueue.Close();

// wait (at most 10 seconds) for processor threads to finish working
Stopwatch sw = Stopwatch.StartNew();
while (_counter.Value > 0 && sw.ElapsedMilliseconds < 10000)
while (counter.Value > 0 && sw.ElapsedMilliseconds < 10000)
{
Thread.Sleep(100);
}

int value = _counter.Value;
int value = counter.Value;
if (value > 0)
{
Logger.Error(
"Failed to stop workers for {importQueue} ({count} remaining)",
_importQueue,
settings.ReadQueue,
value);
}

_isClosing = false;
isClosing = false;
}

public void Dispose()
{
foreach (MessageQueue importMessageQueue in _importMessageQueues)
foreach (MessageQueue importMessageQueue in importMessageQueues)
{
importMessageQueue.Dispose();
}

_errorMessageQueue.Dispose();
errorMessageQueue.Dispose();
}

protected abstract Task DoHandle(string contents);
Expand Down Expand Up @@ -169,7 +163,7 @@ private async void Queue_PeekCompleted(object sender, PeekCompletedEventArgs e)
transaction?.Dispose();

// start new peek operation
if (_isClosing == false)
if (isClosing == false)
{
_ = queue.BeginPeek();
}
Expand All @@ -183,7 +177,7 @@ private async Task ProcessMessage(
{
try
{
_counter.Increment();
counter.Increment();
IDisposable disposable = NestedDiagnosticsLogicalContext.Push(message.Id);
try
{
Expand All @@ -198,7 +192,7 @@ private async Task ProcessMessage(
}
finally
{
_counter.Decrement();
counter.Decrement();
}
}

Expand All @@ -219,7 +213,7 @@ private async Task TryProcessMessage(Message message, MessageQueue sourceQueue,
Logger.Warn(ex);
message.AppSpecific++;
Logger.Warn("Message retry #{count}", message.AppSpecific);
if (message.AppSpecific >= MaximumTries)
if (message.AppSpecific >= MaximumTries && settings.DropFailedMessages == false)
{
MoveToErrorQueue(message, ex, messageQueueTransaction, MaximumTries);
}
Expand Down Expand Up @@ -249,6 +243,6 @@ private void MoveToErrorQueue(Message message, Exception ex, MessageQueueTransac
message.Label = exceptionMessage.Substring(0, Math.Min(exceptionMessage.Length, 249));

// send message to error queue
_errorMessageQueue.Send(message, messageQueueTransaction);
errorMessageQueue.Send(message, messageQueueTransaction);
}
}
37 changes: 8 additions & 29 deletions Snittlistan.Queue/MessageQueueProcessorSettings.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
namespace Snittlistan.Queue;
public class MessageQueueProcessorSettings
{
/// <summary>
/// Initializes a new instance of the MessageQueueProcessorSettings class.
/// </summary>
/// <param name="readQueue">Message queue to read from.</param>
/// <param name="errorQueue">Message queue to use for error handling.</param>
/// <param name="workerThreadCount">Number of processing threads.</param>
/// <param name="autoCreateQueues">Automatically create queues.</param>
public MessageQueueProcessorSettings(string readQueue, string errorQueue, int workerThreadCount, bool autoCreateQueues)
{
ReadQueue = readQueue ?? throw new ArgumentNullException(nameof(readQueue));
ErrorQueue = errorQueue ?? throw new ArgumentNullException(nameof(errorQueue));
WorkerThreadCount = workerThreadCount;
AutoCreateQueues = autoCreateQueues;
}
#nullable enable

public string ReadQueue { get; }
namespace Snittlistan.Queue;

public string ErrorQueue { get; }

public int WorkerThreadCount { get; }

public bool AutoCreateQueues { get; }

public override string ToString()
{
return $"Read={ReadQueue}, Error={ErrorQueue}, Workers={WorkerThreadCount}";
}
}
public record MessageQueueProcessorSettings(
string ReadQueue,
string ErrorQueue,
int WorkerThreadCount,
bool AutoCreateQueues,
bool DropFailedMessages);
7 changes: 6 additions & 1 deletion Snittlistan.Web/Infrastructure/Database/PublishedTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ private PublishedTask(
string createdBy)
{
Task = task;
BusinessKey = task.BusinessKey;
BusinessKeyColumn = task.BusinessKey.ToJson();
TaskType = Enumerable.Aggregate(
BusinessKeyColumn,
(ushort)5381, (l, r) => (ushort)((33 * l) ^ r));
TenantId = tenantId;
CorrelationId = correlationId;
CausationId = causationId;
Expand Down Expand Up @@ -95,6 +98,8 @@ public TaskBase Task
[Column("data")]
public string DataColumn { get; private set; } = null!;

public int TaskType { get; private set; }

/// <summary>
/// When to publish (if delayed, otherwise use DateTime.Now)
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions Sql/010_published_task_task_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- only allow one unhandled published_task for each task_type at a time
ALTER TABLE snittlistan.published_task ADD task_type INT NULL;
CREATE UNIQUE INDEX published_task_task_type_idx ON
snittlistan.published_task(task_type)
WHERE
handled_date IS NULL;

0 comments on commit 4949008

Please sign in to comment.