Skip to content

Commit

Permalink
Resets ActionBlock if faulted or cancelled. (#385)
Browse files Browse the repository at this point in the history
* Resets ActionBlock if faulted or cancelled.
* Log number of uploaded files.

Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp authored Apr 20, 2023
1 parent 1e63847 commit a83ef62
Show file tree
Hide file tree
Showing 18 changed files with 328 additions and 140 deletions.
1 change: 1 addition & 0 deletions src/Api/Storage/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public TimeSpan Elapsed
public string? CalledAeTitle { get => Files.OfType<DicomFileStorageMetadata>().Select(p => p.CalledAeTitle).FirstOrDefault(); }

public int FilesUploaded { get => Files.Count(p => p.IsUploaded); }

public int FilesFailedToUpload { get => Files.Count(p => p.IsUploadFailed); }

public Payload(string key, string correlationId, uint timeout)
Expand Down
10 changes: 10 additions & 0 deletions src/Common/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,15 @@ public static async Task<bool> Post<TInput>(this ActionBlock<TInput> actionBlock
await Task.Delay(delay).ConfigureAwait(false);
return actionBlock.Post(input);
}

/// <summary>
/// Checks if a given task is faulted or cancelled.
/// </summary>
/// <param name="task">The task object</param>
/// <returns>True if canceled or faulted. False otherwise.</returns>
public static bool IsCanceledOrFaulted(this Task task)
{
return task.IsCanceled || task.IsFaulted;
}
}
}
2 changes: 1 addition & 1 deletion src/Configuration/DicomWebConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DicomWebConfiguration
/// Gets or sets the maximum number of simultaneous DICOMweb connections.
/// </summary>
[ConfigurationKeyName("maximumNumberOfConnections")]
public int MaximumNumberOfConnection { get; set; } = 2;
public ushort MaximumNumberOfConnection { get; set; } = 2;

/// <summary>
/// Gets or set the maximum allowed file size in bytes with default to 2GiB.
Expand Down
2 changes: 1 addition & 1 deletion src/Configuration/ScuConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ScuConfiguration
/// Gets or sets the maximum number of simultaneous DICOM associations for the SCU service.
/// </summary>
[ConfigurationKeyName("maximumNumberOfAssociations")]
public int MaximumNumberOfAssociations { get; set; } = 8;
public ushort MaximumNumberOfAssociations { get; set; } = 8;

public ScuConfiguration()
{
Expand Down
50 changes: 50 additions & 0 deletions src/InformaticsGateway/Common/PostPayloadException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Runtime.Serialization;
using Monai.Deploy.InformaticsGateway.Api.Storage;

namespace Monai.Deploy.InformaticsGateway.Common
{
internal class PostPayloadException : Exception
{
public Payload.PayloadState TargetQueue { get; }
public Payload Payload { get; }

public PostPayloadException()
{
}

public PostPayloadException(Api.Storage.Payload.PayloadState targetState, Payload payload)
{
TargetQueue = targetState;
Payload = payload;
}

public PostPayloadException(string message) : base(message)
{
}

public PostPayloadException(string message, Exception innerException) : base(message, innerException)
{
}

protected PostPayloadException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}
}
}
6 changes: 3 additions & 3 deletions src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,8 +33,8 @@ public static partial class Log
[LoggerMessage(EventId = 3004, Level = LogLevel.Trace, Message = "Number of incomplete payloads waiting for processing: {count}.")]
public static partial void BucketsActive(this ILogger logger, int count);

[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {failedFiles} failures out of {totalNumberOfFiles}.")]
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int failedFiles);
[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {succeededFiles} uplaoded and {failedFiles} failures out of {totalNumberOfFiles}.")]
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int succeededFiles, int failedFiles);

[LoggerMessage(EventId = 3007, Level = LogLevel.Information, Message = "Bucket {key} sent to processing queue with {count} files.")]
public static partial void BucketReady(this ILogger logger, string key, int count);
Expand Down
6 changes: 6 additions & 0 deletions src/InformaticsGateway/Logging/Log.500.ExportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,11 @@ public static partial class Log

[LoggerMessage(EventId = 533, Level = LogLevel.Error, Message = "Recovering messaging service connection due to {reason}.")]
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);

[LoggerMessage(EventId = 534, Level = LogLevel.Error, Message = "Error posting export job for processing correlation ID {correlationId}, export task ID {exportTaskId}.")]
public static partial void ErrorPostingExportJobToQueue(this ILogger logger, string correlationId, string exportTaskId);

[LoggerMessage(EventId = 535, Level = LogLevel.Warning, Message = "Exceeded maximum number of worker in {serviceName}: {count}.")]
public static partial void ExceededMaxmimumNumberOfWorkers(this ILogger logger, string serviceName, ulong count);
}
}
12 changes: 12 additions & 0 deletions src/InformaticsGateway/Logging/Log.700.PayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,17 @@ public static partial class Log

[LoggerMessage(EventId = 743, Level = LogLevel.Error, Message = "Exception moving payload.")]
public static partial void PayloadMoveException(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 744, Level = LogLevel.Warning, Message = "PayloadNotification move payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
public static partial void MoveQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);

[LoggerMessage(EventId = 745, Level = LogLevel.Warning, Message = "PayloadNotification publishing payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
public static partial void PublishQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);

[LoggerMessage(EventId = 746, Level = LogLevel.Error, Message = "Error posting payload to move queue.")]
public static partial void ErrorPostingJobToMovePayloadsQueue(this ILogger logger);

[LoggerMessage(EventId = 747, Level = LogLevel.Error, Message = "Error posting payload to publish queue.")]
public static partial void ErrorPostingJobToPublishPayloadsQueue(this ILogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
var payload = await _payloads[key].Task.ConfigureAwait(false);
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });

_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesFailedToUpload);
_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesUploaded, payload.FilesFailedToUpload);
// Wait for timer window closes before sending payload for processing
if (payload.HasTimedOut)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
var action = await UpdatePayloadState(payload, ex, cancellationToken).ConfigureAwait(false);
if (action == PayloadAction.Updated)
{
await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
if (!await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
{
throw new PostPayloadException(Payload.PayloadState.Move, payload);
}
}
}
finally
Expand All @@ -111,7 +114,11 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
await repository.UpdateAsync(payload, cancellationToken).ConfigureAwait(false);
_logger.PayloadSaved(payload.PayloadId);

notificationQueue.Post(payload);
if (!notificationQueue.Post(payload))
{
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
}

_logger.PayloadReadyToBePublished(payload.PayloadId);
}
else // we should never hit this else block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public async Task NotifyAsync(Payload payload, ActionBlock<Payload> notification
var action = await UpdatePayloadState(payload, cancellationToken).ConfigureAwait(false);
if (action == PayloadAction.Updated)
{
await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
_logger.FailedToPublishWorkflowRequest(payload.PayloadId, ex);
if (!await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
{
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
}
}
}
}
Expand Down
127 changes: 103 additions & 24 deletions src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,38 @@ public PayloadNotificationService(IServiceScopeFactory serviceScopeFactory,
_cancellationTokenSource = new CancellationTokenSource();
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
_moveFileQueue = new ActionBlock<Payload>(
MoveActionHandler,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
SetupQueues(cancellationToken);

var task = Task.Run(async () =>
{
await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);
BackgroundProcessing(cancellationToken);
}, CancellationToken.None);

Status = ServiceStatus.Running;
_logger.ServiceStarted(ServiceName);

if (task.IsCompleted)
return task;

return Task.CompletedTask;
}

private void SetupQueues(CancellationToken cancellationToken)
{
ResetMoveQueue(cancellationToken);
ResetPublishQueue(cancellationToken);
}

private void ResetPublishQueue(CancellationToken cancellationToken)
{
if (_publishQueue is not null)
{
_logger.PublishQueueFaulted(_publishQueue.Completion.IsFaulted, _publishQueue.Completion.IsCanceled);
_publishQueue.Complete();
}

_publishQueue = new ActionBlock<Payload>(
NotificationHandler,
Expand All @@ -107,21 +129,24 @@ public async Task StartAsync(CancellationToken cancellationToken)
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
}

await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);

var task = Task.Run(() =>
private void ResetMoveQueue(CancellationToken cancellationToken)
{
if (_moveFileQueue is not null)
{
BackgroundProcessing(cancellationToken);
}, CancellationToken.None);

Status = ServiceStatus.Running;
_logger.ServiceStarted(ServiceName);

if (task.IsCompleted)
await task.ConfigureAwait(false);
_logger.MoveQueueFaulted(_moveFileQueue.Completion.IsFaulted, _moveFileQueue.Completion.IsCanceled);
_moveFileQueue.Complete();
}

await Task.CompletedTask.ConfigureAwait(false);
_moveFileQueue = new ActionBlock<Payload>(
MoveActionHandler,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
}

private async Task NotificationHandler(Payload payload)
Expand All @@ -134,6 +159,10 @@ private async Task NotificationHandler(Payload payload)
{
await _payloadNotificationActionHandler.NotifyAsync(payload, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (PostPayloadException ex)
{
HandlePostPayloadException(ex);
}
catch (Exception ex)
{
if (ex is PayloadNotifyException payloadMoveException &&
Expand All @@ -158,6 +187,10 @@ private async Task MoveActionHandler(Payload payload)
{
await _payloadMoveActionHandler.MoveFilesAsync(payload, _moveFileQueue, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (PostPayloadException ex)
{
HandlePostPayloadException(ex);
}
catch (Exception ex)
{
if (ex is PayloadNotifyException payloadMoveException &&
Expand All @@ -172,17 +205,45 @@ private async Task MoveActionHandler(Payload payload)
}
}

private void HandlePostPayloadException(PostPayloadException ex)
{
Guard.Against.Null(ex);

if (ex.TargetQueue == Payload.PayloadState.Move)
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, CancellationToken.None);
if (!_moveFileQueue.Post(ex.Payload))
{
_logger.ErrorPostingJobToMovePayloadsQueue();
}
}
else if (ex.TargetQueue == Payload.PayloadState.Notify)
{
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, CancellationToken.None);
if (!_publishQueue.Post(ex.Payload))
{
_logger.ErrorPostingJobToPublishPayloadsQueue();
}
}
}

private void BackgroundProcessing(CancellationToken cancellationToken)
{
_logger.ServiceRunning(ServiceName);

while (!cancellationToken.IsCancellationRequested)
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, cancellationToken);

Payload payload = null;
try
{
payload = _payloadAssembler.Dequeue(cancellationToken);
_moveFileQueue.Post(payload);
while (!_moveFileQueue.Post(payload))
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
}
_logger.PayloadQueuedForProcessing(payload.PayloadId, ServiceName);
}
catch (OperationCanceledException ex)
Expand All @@ -202,6 +263,18 @@ private void BackgroundProcessing(CancellationToken cancellationToken)
_logger.ServiceCancelled(ServiceName);
}

private static void ResetIfFaultedOrCancelled(ActionBlock<Payload> queue, Action<CancellationToken> resetFunction, CancellationToken cancellationToken)
{
Guard.Against.Null(queue);
Guard.Against.Null(resetFunction);

if (queue.Completion.IsCanceledOrFaulted())
{
resetFunction(cancellationToken);
}
}


private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
{
_logger.StartupRestoreFromDatabase();
Expand All @@ -214,11 +287,17 @@ private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
{
if (payload.State == Payload.PayloadState.Move)
{
_moveFileQueue.Post(payload);
if (!_moveFileQueue.Post(payload))
{
_logger.ErrorPostingJobToMovePayloadsQueue();
}
}
else if (payload.State == Payload.PayloadState.Notify)
{
_publishQueue.Post(payload);
if (!_publishQueue.Post(payload))
{
_logger.ErrorPostingJobToPublishPayloadsQueue();
}
}
}
_logger.RestoredFromDatabase(payloads.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal class DicomWebExportService : ExportServiceBase
private readonly IOptions<InformaticsGatewayConfiguration> _configuration;
private readonly IDicomToolkit _dicomToolkit;

protected override int Concurrency { get; }
protected override ushort Concurrency { get; }
public override string RoutingKey { get; }
public override string ServiceName => "DICOMweb Export Service";

Expand Down
Loading

0 comments on commit a83ef62

Please sign in to comment.