Skip to content

Commit 6e34616

Browse files
committed
Move messages to failed queue when plugin isn't registered
1 parent fc4c48e commit 6e34616

File tree

8 files changed

+100
-34
lines changed

8 files changed

+100
-34
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
3+
namespace InEngine.Core.Exceptions
4+
{
5+
public class CommandNotExtractableFromEnvelopeException : Exception
6+
{
7+
public CommandNotExtractableFromEnvelopeException(string command, Exception exception)
8+
: base(
9+
$"The plugin is (probably) not registered - check the message in the failure queue for details. Command name: {command}",
10+
exception
11+
)
12+
{}
13+
}
14+
}

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,28 +54,40 @@ public string FailedQueuePath
5454

5555
public void Publish(AbstractCommand command)
5656
{
57-
if (!Directory.Exists(PendingQueuePath))
58-
Directory.CreateDirectory(PendingQueuePath);
59-
var serializedMessage = new CommandEnvelope() {
57+
PublishToQueue(new CommandEnvelope() {
6058
IsCompressed = UseCompression,
6159
CommandClassName = command.GetType().FullName,
6260
PluginName = command.GetType().Assembly.GetName().Name,
6361
SerializedCommand = command.SerializeToJson(UseCompression)
64-
}.SerializeToJson();
65-
using (var streamWriter = File.CreateText(Path.Combine(PendingQueuePath, Guid.NewGuid().ToString())))
66-
{
67-
streamWriter.Write(serializedMessage);
68-
}
62+
}, PendingQueuePath);
63+
}
64+
65+
void PublishToQueue(CommandEnvelope commandEnvelope, string queuePath)
66+
{
67+
if (!Directory.Exists(queuePath))
68+
Directory.CreateDirectory(queuePath);
69+
70+
File.WriteAllText(
71+
Path.Combine(queuePath, Guid.NewGuid().ToString()),
72+
commandEnvelope.SerializeToJson()
73+
);
6974
}
7075

7176
public void Consume(CancellationToken cancellationToken)
7277
{
7378
try
7479
{
75-
while(true)
80+
while(true)
7681
{
77-
if (Consume() == null)
78-
Thread.Sleep(5000);
82+
try
83+
{
84+
if (Consume() == null)
85+
Thread.Sleep(5000);
86+
}
87+
catch (Exception exception)
88+
{
89+
Log.Error(exception);
90+
}
7991
cancellationToken.ThrowIfCancellationRequested();
8092
}
8193
}
@@ -109,11 +121,12 @@ public ICommandEnvelope Consume()
109121
return null;
110122
}
111123
consumeLock.ReleaseMutex();
112-
113-
var commandEnvelope = File.ReadAllText(inProgressFilePath).DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope;
114-
var command = commandEnvelope.GetCommandInstance();
115-
command.CommandLifeCycle.IncrementRetry();
116-
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
124+
125+
var commandEnvelope = File.ReadAllText(inProgressFilePath).DeserializeFromJson<CommandEnvelope>();
126+
var command = commandEnvelope.GetCommandInstanceAndIncrementRetry(() => {
127+
File.Move(inProgressFilePath, Path.Combine(FailedQueuePath, fileInfo.Name));
128+
});
129+
117130
try
118131
{
119132
command.WriteSummaryToConsole();
@@ -122,8 +135,10 @@ public ICommandEnvelope Consume()
122135
catch (Exception exception)
123136
{
124137
Log.Error(exception);
125-
if (command.CommandLifeCycle.ShouldRetry())
126-
File.Move(inProgressFilePath, Path.Combine(PendingQueuePath, fileInfo.Name));
138+
if (command.CommandLifeCycle.ShouldRetry()) {
139+
PublishToQueue(commandEnvelope, PendingQueuePath);
140+
File.Delete(Path.Combine(PendingQueuePath, fileInfo.Name));
141+
}
127142
else
128143
File.Move(inProgressFilePath, Path.Combine(FailedQueuePath, fileInfo.Name));
129144
throw new CommandFailedException("Failed to consume command.", exception);

src/InEngine.Core/Queuing/Clients/RabbitMQClient.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ public void Consume(CancellationToken cancellationToken)
100100
if (commandEnvelope == null)
101101
throw new CommandFailedException("Could not deserialize the command.");
102102

103-
var command = commandEnvelope.GetCommandInstance();
104-
command.CommandLifeCycle.IncrementRetry();
105-
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
103+
var command = commandEnvelope.GetCommandInstanceAndIncrementRetry(() => {
104+
eventingConsumer.Model.BasicNack(result.DeliveryTag, false, false);
105+
});
106+
106107
try
107108
{
108109
command.WriteSummaryToConsole();
@@ -137,9 +138,10 @@ public ICommandEnvelope Consume()
137138
if (commandEnvelope == null)
138139
throw new CommandFailedException("Could not deserialize the command.");
139140

140-
var command = commandEnvelope.GetCommandInstance();
141-
command.CommandLifeCycle.IncrementRetry();
142-
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
141+
var command = commandEnvelope.GetCommandInstanceAndIncrementRetry(() => {
142+
Channel.BasicNack(result.DeliveryTag, false, false);
143+
});
144+
143145
try
144146
{
145147
command.WriteSummaryToConsole();

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ public ICommandEnvelope Consume()
102102
if (commandEnvelope == null)
103103
throw new CommandFailedException("Could not deserialize the command.");
104104

105-
var command = commandEnvelope.GetCommandInstance();
106-
command.CommandLifeCycle.IncrementRetry();
107-
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
105+
var command = commandEnvelope.GetCommandInstanceAndIncrementRetry(() => {
106+
Redis.ListLeftPush(FailedQueueName, commandEnvelope.SerializeToJson());
107+
});
108+
108109
try
109110
{
110111
command.WriteSummaryToConsole();

src/InEngine.Core/Queuing/Commands/Peek.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void PrintMessages(List<ICommandEnvelope> messages, string queueName)
9191
if (JsonFormat)
9292
Line(commandEnvelope.SerializeToJson());
9393
else
94-
konsoleForm.Write(commandEnvelope.GetCommandInstance());
94+
konsoleForm.Write(commandEnvelope.GetCommandInstanceAndIncrementRetry());
9595
});
9696
}
9797
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System.Linq;
2+
3+
namespace InEngine.Core.Queuing.Commands
4+
{
5+
public class Length : AbstractCommand, IHasQueueSettings
6+
{
7+
public QueueSettings QueueSettings { get; set; }
8+
9+
public override void Run()
10+
{
11+
PrintQueueLengths(QueueAdapter.Make(false, QueueSettings, MailSettings));
12+
PrintQueueLengths(QueueAdapter.Make(true, QueueSettings, MailSettings));
13+
}
14+
15+
public void PrintQueueLengths(QueueAdapter queue)
16+
{
17+
Warning($"{queue.QueueName} Queue:");
18+
queue.GetQueueLengths().ToList().ForEach(x => {
19+
InfoText(x.Key.PadLeft(15));
20+
Line(x.Value.ToString().PadLeft(10));
21+
});
22+
Newline();
23+
}
24+
}
25+
}

src/InEngine.Core/Queuing/Message/CommandEnvelope.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,21 @@ public class CommandEnvelope : ICommandEnvelope
1212
public DateTime QueuedAt { get; set; } = DateTime.UtcNow;
1313
public bool IsCompressed { get; set; }
1414

15-
public AbstractCommand GetCommandInstance()
15+
public AbstractCommand GetCommandInstanceAndIncrementRetry(Action actionOnFail = null)
1616
{
17-
var commandType = PluginAssembly.LoadFrom(PluginName).GetCommandType(CommandClassName);
18-
if (commandType == null)
19-
throw new CommandFailedException($"Could not locate command {CommandClassName}. Is the {PluginName} plugin registered in the settings file?");
20-
return SerializedCommand.DeserializeFromJson<AbstractCommand>(IsCompressed);
17+
try
18+
{
19+
PluginAssembly.LoadFrom(PluginName).GetCommandType(CommandClassName);
20+
var command = SerializedCommand.DeserializeFromJson<AbstractCommand>(IsCompressed);
21+
command.CommandLifeCycle.IncrementRetry();
22+
SerializedCommand = command.SerializeToJson(IsCompressed);
23+
return command;
24+
}
25+
catch (Exception exception)
26+
{
27+
actionOnFail.Invoke();
28+
throw new CommandNotExtractableFromEnvelopeException(CommandClassName, exception);
29+
}
2130
}
2231
}
2332
}

src/InEngine.Core/Queuing/Message/ICommandEnvelope.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ public interface ICommandEnvelope
1010
string SerializedCommand { get; set; }
1111
DateTime QueuedAt { get; set; }
1212
bool IsCompressed { get; set; }
13-
AbstractCommand GetCommandInstance();
13+
AbstractCommand GetCommandInstanceAndIncrementRetry(Action actionOnFail = null);
1414
}
1515
}

0 commit comments

Comments
 (0)