Skip to content

Commit b01f281

Browse files
authored
Merge pull request #33 from peppy/fix-exception-handling
Fix exception handling when multiple items are involved
2 parents 5280840 + b4e8485 commit b01f281

File tree

4 files changed

+82
-11
lines changed

4 files changed

+82
-11
lines changed

osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs

+35
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,41 @@ public void SendThenErrorDoesRetry()
195195
Assert.Equal(obj, receivedObject);
196196
}
197197

198+
[Fact]
199+
public void MultipleErrorsAttachedToCorrectItems()
200+
{
201+
var cts = new CancellationTokenSource(10000);
202+
203+
var obj1 = FakeData.New();
204+
var obj2 = FakeData.New();
205+
206+
bool gotCorrectExceptionForItem1 = false;
207+
bool gotCorrectExceptionForItem2 = false;
208+
209+
processor.Error += (exception, item) =>
210+
{
211+
Assert.NotNull(exception);
212+
Assert.Equal(exception, item.Exception);
213+
214+
gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1";
215+
gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2";
216+
};
217+
218+
processor.PushToQueue(new[] { obj1, obj2 });
219+
220+
processor.Received += o =>
221+
{
222+
if (Equals(o.Data, obj1.Data)) throw new Exception("1");
223+
if (Equals(o.Data, obj2.Data)) throw new Exception("2");
224+
};
225+
226+
processor.Run(cts.Token);
227+
228+
Assert.Equal(0, processor.GetQueueSize());
229+
Assert.True(gotCorrectExceptionForItem1);
230+
Assert.True(gotCorrectExceptionForItem2);
231+
}
232+
198233
[Fact]
199234
public void SendThenErrorForeverDoesDrop()
200235
{

osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs

+8-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable<FakeData> items)
2121
{
2222
foreach (var item in items)
2323
{
24-
Received?.Invoke(item);
24+
try
25+
{
26+
Received?.Invoke(item);
27+
}
28+
catch (Exception e)
29+
{
30+
item.Exception = e;
31+
}
2532
}
2633
}
2734

osu.Server.QueueProcessor/QueueItem.cs

+11-1
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor
1212
[Serializable]
1313
public abstract class QueueItem
1414
{
15+
[IgnoreDataMember]
16+
private bool failed;
17+
1518
/// <summary>
1619
/// Set to <c>true</c> to mark this item is failed. This will cause it to be retried.
1720
/// </summary>
1821
[IgnoreDataMember]
19-
public bool Failed { get; set; }
22+
public bool Failed
23+
{
24+
get => failed || Exception != null;
25+
set => failed = value;
26+
}
27+
28+
[IgnoreDataMember]
29+
public Exception? Exception { get; set; }
2030

2131
/// <summary>
2232
/// The number of times processing this item has been retried. Handled internally by <see cref="QueueProcessor{T}"/>.

osu.Server.QueueProcessor/QueueProcessor.cs

+28-9
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default)
132132

133133
// individual processing should not be cancelled as we have already grabbed from the queue.
134134
Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool)
135-
.ContinueWith(t =>
135+
.ContinueWith(_ =>
136136
{
137137
foreach (var item in items)
138138
{
139-
if (t.Exception != null || item.Failed)
139+
if (item.Failed)
140140
{
141141
Interlocked.Increment(ref totalErrors);
142142

@@ -145,12 +145,18 @@ public void Run(CancellationToken cancellation = default)
145145

146146
Interlocked.Increment(ref consecutiveErrors);
147147

148-
Error?.Invoke(t.Exception, item);
148+
try
149+
{
150+
Error?.Invoke(item.Exception, item);
151+
}
152+
catch
153+
{
154+
}
149155

150-
if (t.Exception != null)
151-
SentrySdk.CaptureException(t.Exception);
156+
if (item.Exception != null)
157+
SentrySdk.CaptureException(item.Exception);
152158

153-
Console.WriteLine($"Error processing {item}: {t.Exception}");
159+
Console.WriteLine($"Error processing {item}: {item.Exception}");
154160
attemptRetry(item);
155161
}
156162
else
@@ -197,8 +203,6 @@ private void setupSentry(SentryOptions options)
197203

198204
private void attemptRetry(T item)
199205
{
200-
item.Failed = false;
201-
202206
if (item.TotalRetries++ < config.MaxRetries)
203207
{
204208
Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}");
@@ -274,11 +278,26 @@ protected virtual void ProcessResult(T item)
274278
/// <summary>
275279
/// Implement to process batches of items from the queue.
276280
/// </summary>
281+
/// <remarks>
282+
/// In most cases, you should only need to override and implement <see cref="ProcessResult"/>.
283+
/// Only override this if you need more efficient batch processing.
284+
///
285+
/// If overriding this method, you should try-catch for exceptions, and set any exception against
286+
/// the relevant <see cref="QueueItem"/>. If this is not done, failures will not be handled correctly.</remarks>
277287
/// <param name="items">The items to process.</param>
278288
protected virtual void ProcessResults(IEnumerable<T> items)
279289
{
280290
foreach (var item in items)
281-
ProcessResult(item);
291+
{
292+
try
293+
{
294+
ProcessResult(item);
295+
}
296+
catch (Exception e)
297+
{
298+
item.Exception = e;
299+
}
300+
}
282301
}
283302
}
284303
}

0 commit comments

Comments
 (0)