diff --git a/doc/dependency_decisions.yml b/doc/dependency_decisions.yml index 136e401..4b9fd9b 100755 --- a/doc/dependency_decisions.yml +++ b/doc/dependency_decisions.yml @@ -789,4 +789,4 @@ :why: MIT ( https://licenses.nuget.org/MIT) :versions: - 2.5.0 - :when: 2022-08-16 21:40:32.294717110 Z + :when: 2022-08-16 21:40:32.294717110 Z \ No newline at end of file diff --git a/src/Messaging/API/IMessageBrokerSubscriberService.cs b/src/Messaging/API/IMessageBrokerSubscriberService.cs index 49c4613..e43de1e 100755 --- a/src/Messaging/API/IMessageBrokerSubscriberService.cs +++ b/src/Messaging/API/IMessageBrokerSubscriberService.cs @@ -31,30 +31,6 @@ public interface IMessageBrokerSubscriberService : IDisposable /// string Name { get; } - /// - /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received. - /// Either provide a topic, a queue or both. - /// A queue is generated if the name of the queue is not provided. - /// - /// Topic/routing key to bind to - /// Name of the queue to consume - /// Action to be performed when message is received - /// Number of unacknowledged messages to receive at once. Defaults to 0. - [Obsolete("This method is obsolete, use SubscribeAsync instead")] - void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0); - - /// - /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received. - /// Either provide a topic, a queue or both. - /// A queue is generated if the name of the queue is not provided. - /// - /// Topics/routing keys to bind to - /// Name of the queue to consume - /// Action to be performed when message is received - /// Number of unacknowledged messages to receive at once. Defaults to 0. - [Obsolete("This method is obsolete, use SubscribeAsync instead")] - void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0); - /// /// Subscribe to a message topic & queue and executes messageReceivedCallback asynchronously for every message that is received. /// Either provide a topic, a queue or both. diff --git a/src/Messaging/Common/ServiceException.cs b/src/Messaging/Common/ServiceException.cs new file mode 100644 index 0000000..6bdaf30 --- /dev/null +++ b/src/Messaging/Common/ServiceException.cs @@ -0,0 +1,39 @@ +/* + * Copyright 2023 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Runtime.Serialization; + +namespace Monai.Deploy.Messaging.Common +{ + public class ServiceException : Exception + { + public ServiceException() + { + } + + public ServiceException(string? message) : base(message) + { + } + + public ServiceException(string? message, Exception? innerException) : base(message, innerException) + { + } + + protected ServiceException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} diff --git a/src/Messaging/Events/ExportRequestEvent.cs b/src/Messaging/Events/ExportRequestEvent.cs index 4c2d620..2d6c89e 100755 --- a/src/Messaging/Events/ExportRequestEvent.cs +++ b/src/Messaging/Events/ExportRequestEvent.cs @@ -77,17 +77,17 @@ public class ExportRequestEvent : EventBase public List ErrorMessages { get; private set; } /// - /// Gets or set the ExportRequest type. - /// For standard exports this will be ExportRequestType.None - /// but for exports to external apps this will be ExportRequestType.ExternalProcessing + /// A list of data output plug-in type names to be executed by the export services. + /// Each string must be a fully-qualified type name. + /// E.g. MyCompnay.MyProject.MyNamepsace.MyPlugin, MyCompnay.MyProject.MyNamepsace where + /// MyCompnay.MyProject.MyNamepsace is the name of the assembly (DLL). /// - [JsonProperty(PropertyName = "export_request")] - [Required] - public ExportRequestType ExportRequest { get; set; } = default!; + public List PluginAssemblies { get; private set; } public ExportRequestEvent() { ErrorMessages = new List(); + PluginAssemblies = new List(); } public void AddErrorMessages(IList errorMessages) diff --git a/src/Messaging/Events/WorkflowRequestEvent.cs b/src/Messaging/Events/WorkflowRequestEvent.cs index 6a58f61..1f0be31 100755 --- a/src/Messaging/Events/WorkflowRequestEvent.cs +++ b/src/Messaging/Events/WorkflowRequestEvent.cs @@ -89,6 +89,14 @@ public class WorkflowRequestEvent : EventBase [JsonProperty(PropertyName = "workflow_instance_id")] public string? WorkflowInstanceId { get; set; } = default; + /// + /// Sets the task ID for the Workflow Manager. + /// This is only applicable to resume events (after external app executions) + /// In standard workflows this will not be set + /// + [JsonProperty(PropertyName = "task_id")] + public string? TaskId { get; set; } = default; + /// /// Gets or sets a list of files and metadata files in this request. /// diff --git a/src/Messaging/Tests/IServiceCollectionExtensionsTests.cs b/src/Messaging/Tests/IServiceCollectionExtensionsTests.cs index 6c9042a..473ef81 100644 --- a/src/Messaging/Tests/IServiceCollectionExtensionsTests.cs +++ b/src/Messaging/Tests/IServiceCollectionExtensionsTests.cs @@ -203,7 +203,10 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService { public string Name => throw new NotImplementedException(); +#pragma warning disable CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used + // event used by users of this library public event ConnectionErrorHandler? OnConnectionError; +#pragma warning restore CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used public void Acknowledge(MessageBase message) => throw new NotImplementedException(); diff --git a/src/Messaging/Tests/WorkflowRequestMessageTest.cs b/src/Messaging/Tests/WorkflowRequestMessageTest.cs index 09729b6..dcec857 100644 --- a/src/Messaging/Tests/WorkflowRequestMessageTest.cs +++ b/src/Messaging/Tests/WorkflowRequestMessageTest.cs @@ -36,7 +36,9 @@ public void ConvertsJsonMessageToMessage() FileCount = 10, PayloadId = Guid.NewGuid(), Timestamp = DateTime.Now, - Workflows = new List { Guid.NewGuid().ToString() } + Workflows = new List { Guid.NewGuid().ToString() }, + WorkflowInstanceId = Guid.NewGuid().ToString(), + TaskId = Guid.NewGuid().ToString(), }; var files = new List() diff --git a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs index 7a1cda7..017dc64 100755 --- a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs @@ -109,7 +109,7 @@ private void CreateChannel() .Execute(() => { _logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost); - _channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); + _channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber) ?? throw new ServiceException("Failed to create a new channel to RabbitMQ"); _channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); _channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false); _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); @@ -162,15 +162,6 @@ internal static void ValidateConfiguration(Dictionary configurat throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has int values of less than 1"); } } - [Obsolete("This method is obsolete, use SubscribeAsync instead")] - public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) - => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); - - [Obsolete("This method is obsolete, use SubscribeAsync instead")] - public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) - { - SubscribeAsync(topics, queue, new Func((args) => { messageReceivedCallback.Invoke(args); return Task.FromResult(0); })); - } public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) => SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); @@ -212,7 +203,7 @@ private EventingBasicConsumer CreateConsumer(Func + service.SubscribeAsync("topic", "queue", async (args) => { Assert.Equal(message.ApplicationId, args.Message.ApplicationId); Assert.Equal(message.ContentType, args.Message.ContentType); @@ -143,6 +143,7 @@ public void SubscribesToATopic() Assert.Equal("topic", args.Message.MessageDescription); Assert.Equal(message.MessageId, args.Message.MessageId); Assert.Equal(message.Body, args.Message.Body); + await Task.CompletedTask.ConfigureAwait(false); }); service.SubscribeAsync("topic", "queue", async (args) => @@ -240,7 +241,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown() var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object); - service.Subscribe("topic", "queue", (args) => + service.SubscribeAsync("topic", "queue", async (args) => { Assert.Equal(message.ApplicationId, args.Message.ApplicationId); Assert.Equal(message.ContentType, args.Message.ContentType); @@ -250,6 +251,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown() Assert.Equal("topic", args.Message.MessageDescription); Assert.Equal(message.MessageId, args.Message.MessageId); Assert.Equal(message.Body, args.Message.Body); + await Task.CompletedTask; }); service.SubscribeAsync("topic", "queue", async (args) => @@ -349,7 +351,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce var act = () => { - service.Subscribe("topic", "queue", (args) => + service.SubscribeAsync("topic", "queue", async (args) => { Assert.Equal(message.ApplicationId, args.Message.ApplicationId); Assert.Equal(message.ContentType, args.Message.ContentType); @@ -359,7 +361,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce Assert.Equal("topic", args.Message.MessageDescription); Assert.Equal(message.MessageId, args.Message.MessageId); Assert.Equal(message.Body, args.Message.Body); - + await Task.CompletedTask; }); }; Assert.Throws(act); diff --git a/third-party-licenses.md b/third-party-licenses.md index a37caa2..bdd4ee9 100644 --- a/third-party-licenses.md +++ b/third-party-licenses.md @@ -17166,4 +17166,3 @@ Data pulled from spdx/license-list-data on February 9, 2023. ``` -