Skip to content

Commit

Permalink
Release/0.1.24 (#209) (#213)
Browse files Browse the repository at this point in the history
+semver: patch

* adding new fields to messages for external app executions
* added forgot licence header
* added export request to export cmplete message event
* Update ExportRequestEvent and update dependencies (#210)
* Update dependencies
* Remove ExportRequestType enum and replace with a list of plug-in assembly names
* Update dependencies decisions
* Include taskId in WorkflowRequestEvent to support multiple external app executions


---------

Signed-off-by: Neil South <[email protected]>
Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp authored Aug 12, 2023
1 parent 31afb2f commit 82cbb73
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 49 deletions.
2 changes: 1 addition & 1 deletion doc/dependency_decisions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,4 @@
:why: MIT ( https://licenses.nuget.org/MIT)
:versions:
- 2.5.0
:when: 2022-08-16 21:40:32.294717110 Z
:when: 2022-08-16 21:40:32.294717110 Z
24 changes: 0 additions & 24 deletions src/Messaging/API/IMessageBrokerSubscriberService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,6 @@ public interface IMessageBrokerSubscriberService : IDisposable
/// </summary>
string Name { get; }

/// <summary>
/// Subscribe to a message topic & queue and executes <c>messageReceivedCallback</c> for every message that is received.
/// Either provide a topic, a queue or both.
/// A queue is generated if the name of the queue is not provided.
/// </summary>
/// <param name="topic">Topic/routing key to bind to</param>
/// <param name="queue">Name of the queue to consume</param>
/// <param name="messageReceivedCallback">Action to be performed when message is received</param>
/// <param name="prefetchCount">Number of unacknowledged messages to receive at once. Defaults to 0.</param>
[Obsolete("This method is obsolete, use SubscribeAsync instead")]
void Subscribe(string topic, string queue, Action<MessageReceivedEventArgs> messageReceivedCallback, ushort prefetchCount = 0);

/// <summary>
/// Subscribe to a message topic & queue and executes <c>messageReceivedCallback</c> for every message that is received.
/// Either provide a topic, a queue or both.
/// A queue is generated if the name of the queue is not provided.
/// </summary>
/// <param name="topics">Topics/routing keys to bind to</param>
/// <param name="queue">Name of the queue to consume</param>
/// <param name="messageReceivedCallback">Action to be performed when message is received</param>
/// <param name="prefetchCount">Number of unacknowledged messages to receive at once. Defaults to 0.</param>
[Obsolete("This method is obsolete, use SubscribeAsync instead")]
void Subscribe(string[] topics, string queue, Action<MessageReceivedEventArgs> messageReceivedCallback, ushort prefetchCount = 0);

/// <summary>
/// Subscribe to a message topic & queue and executes <c>messageReceivedCallback</c> asynchronously for every message that is received.
/// Either provide a topic, a queue or both.
Expand Down
39 changes: 39 additions & 0 deletions src/Messaging/Common/ServiceException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Runtime.Serialization;

namespace Monai.Deploy.Messaging.Common
{
public class ServiceException : Exception
{
public ServiceException()
{
}

public ServiceException(string? message) : base(message)
{
}

public ServiceException(string? message, Exception? innerException) : base(message, innerException)
{
}

protected ServiceException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}
}
}
12 changes: 6 additions & 6 deletions src/Messaging/Events/ExportRequestEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ public class ExportRequestEvent : EventBase
public List<string> ErrorMessages { get; private set; }

/// <summary>
/// Gets or set the ExportRequest type.
/// For standard exports this will be ExportRequestType.None
/// but for exports to external apps this will be ExportRequestType.ExternalProcessing
/// A list of data output plug-in type names to be executed by the export services.
/// Each string must be a fully-qualified type name.
/// E.g. <code>MyCompnay.MyProject.MyNamepsace.MyPlugin, MyCompnay.MyProject.MyNamepsace</code> where
/// <code>MyCompnay.MyProject.MyNamepsace</code> is the name of the assembly (DLL).
/// </summary>
[JsonProperty(PropertyName = "export_request")]
[Required]
public ExportRequestType ExportRequest { get; set; } = default!;
public List<string> PluginAssemblies { get; private set; }

public ExportRequestEvent()
{
ErrorMessages = new List<string>();
PluginAssemblies = new List<string>();
}

public void AddErrorMessages(IList<string> errorMessages)
Expand Down
8 changes: 8 additions & 0 deletions src/Messaging/Events/WorkflowRequestEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public class WorkflowRequestEvent : EventBase
[JsonProperty(PropertyName = "workflow_instance_id")]
public string? WorkflowInstanceId { get; set; } = default;

/// <summary>
/// Sets the task ID for the Workflow Manager.
/// This is only applicable to resume events (after external app executions)
/// In standard workflows this will not be set
/// </summary>
[JsonProperty(PropertyName = "task_id")]
public string? TaskId { get; set; } = default;

/// <summary>
/// Gets or sets a list of files and metadata files in this request.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Messaging/Tests/IServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService
{
public string Name => throw new NotImplementedException();

#pragma warning disable CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used
// event used by users of this library
public event ConnectionErrorHandler? OnConnectionError;
#pragma warning restore CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

public void Acknowledge(MessageBase message) => throw new NotImplementedException();

Expand Down
4 changes: 3 additions & 1 deletion src/Messaging/Tests/WorkflowRequestMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public void ConvertsJsonMessageToMessage()
FileCount = 10,
PayloadId = Guid.NewGuid(),
Timestamp = DateTime.Now,
Workflows = new List<string> { Guid.NewGuid().ToString() }
Workflows = new List<string> { Guid.NewGuid().ToString() },
WorkflowInstanceId = Guid.NewGuid().ToString(),
TaskId = Guid.NewGuid().ToString(),
};

var files = new List<BlockStorageInfo>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void CreateChannel()
.Execute(() =>
{
_logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost);
_channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
_channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber) ?? throw new ServiceException("Failed to create a new channel to RabbitMQ");
_channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Expand Down Expand Up @@ -162,15 +162,6 @@ internal static void ValidateConfiguration(Dictionary<string, string> configurat
throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has int values of less than 1");
}
}
[Obsolete("This method is obsolete, use SubscribeAsync instead")]
public void Subscribe(string topic, string queue, Action<MessageReceivedEventArgs> messageReceivedCallback, ushort prefetchCount = 0)
=> Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount);

[Obsolete("This method is obsolete, use SubscribeAsync instead")]
public void Subscribe(string[] topics, string queue, Action<MessageReceivedEventArgs> messageReceivedCallback, ushort prefetchCount = 0)
{
SubscribeAsync(topics, queue, new Func<MessageReceivedEventArgs, Task>((args) => { messageReceivedCallback.Invoke(args); return Task.FromResult(0); }));
}

public void SubscribeAsync(string topic, string queue, Func<MessageReceivedEventArgs, Task> messageReceivedCallback, ushort prefetchCount = 0)
=> SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount);
Expand Down Expand Up @@ -212,7 +203,7 @@ private EventingBasicConsumer CreateConsumer(Func<MessageReceivedEventArgs, Task
_logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex);

_logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId);
_channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false);
_channel!.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false);
_logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false);
return;
}
Expand Down Expand Up @@ -367,7 +358,6 @@ private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliver
{
Guard.Against.NullOrWhiteSpace(topic, nameof(topic));
Guard.Against.Null(eventArgs, nameof(eventArgs));

Guard.Against.Null(eventArgs.Body, nameof(eventArgs.Body));
Guard.Against.Null(eventArgs.BasicProperties, nameof(eventArgs.BasicProperties));
Guard.Against.Null(eventArgs.BasicProperties.MessageId, nameof(eventArgs.BasicProperties.MessageId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void SubscribesToATopic()

var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);

service.Subscribe("topic", "queue", (args) =>
service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
Expand All @@ -143,6 +143,7 @@ public void SubscribesToATopic()
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);
await Task.CompletedTask.ConfigureAwait(false);
});

service.SubscribeAsync("topic", "queue", async (args) =>
Expand Down Expand Up @@ -240,7 +241,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown()

var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);

service.Subscribe("topic", "queue", (args) =>
service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
Expand All @@ -250,6 +251,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown()
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);
await Task.CompletedTask;
});

service.SubscribeAsync("topic", "queue", async (args) =>
Expand Down Expand Up @@ -349,7 +351,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce

var act = () =>
{
service.Subscribe("topic", "queue", (args) =>
service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
Expand All @@ -359,7 +361,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);

await Task.CompletedTask;
});
};
Assert.Throws<OperationInterruptedException>(act);
Expand Down
1 change: 0 additions & 1 deletion third-party-licenses.md
Original file line number Diff line number Diff line change
Expand Up @@ -17166,4 +17166,3 @@ Data pulled from spdx/license-list-data on February 9, 2023.
```

</details>

0 comments on commit 82cbb73

Please sign in to comment.