diff --git a/src/Plugins/RabbitMQ/ChannelType.cs b/src/Plugins/RabbitMQ/ChannelType.cs new file mode 100644 index 0000000..6cc735d --- /dev/null +++ b/src/Plugins/RabbitMQ/ChannelType.cs @@ -0,0 +1,24 @@ +/* + * Copyright 2021-2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public enum ChannelType + { + Subscriber, + Publisher + } +} diff --git a/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs index 6ea0358..cda59ee 100644 --- a/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs +++ b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs @@ -32,6 +32,6 @@ public interface IRabbitMQConnectionFactory /// Encrypt communication /// Port Number /// Instance of . - IModel? CreateChannel( string hostName, string username, string password, string virtualHost, string useSSL, string portNumber); + IModel? CreateChannel( ChannelType type, string hostName, string username, string password, string virtualHost, string useSSL, string portNumber); } } diff --git a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs index 4559800..de6edaa 100644 --- a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs +++ b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs @@ -41,17 +41,17 @@ public RabbitMQConnectionFactory(ILogger logger) } - public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber) + public IModel CreateChannel(ChannelType type, string hostName, string username, string password, string virtualHost, string useSSL, string portNumber) { Guard.Against.NullOrWhiteSpace(hostName); Guard.Against.NullOrWhiteSpace(username); Guard.Against.NullOrWhiteSpace(password); Guard.Against.NullOrWhiteSpace(virtualHost); - var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}"; + var key = $"{type}{hostName}{username}{HashPassword(password)}{virtualHost}"; var connection = _connections.AddOrUpdate(key, - x => MakeConnection(hostName, username, password, virtualHost, key, useSSL, portNumber), + x => MakeConnection(type, hostName, username, password, virtualHost, key, useSSL, portNumber), (updateKey, updateConnection) => { // If connection to RMQ is lost and: @@ -62,13 +62,13 @@ public IModel CreateChannel(string hostName, string username, string password, s { if (updateConnection.model.IsClosed) { - updateConnection.model = MakeChannel(updateConnection.connection, key); + updateConnection.model = MakeChannel(type, updateConnection.connection, key); } return updateConnection; } else { - return MakeConnection(hostName, username, password, virtualHost, key, useSSL, portNumber); + return MakeConnection(type, hostName, username, password, virtualHost, key, useSSL, portNumber); } }); @@ -87,18 +87,22 @@ private void OnException(CallbackExceptionEventArgs args, IConnection value, str } - private (IConnection, IModel) MakeConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber) + private (IConnection, IModel) MakeConnection(ChannelType type, string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber) { var connection = CreateConnectionOnly(hostName, username, password, virtualHost, key, useSSL, portNumber); - var model = MakeChannel(connection, key); + var model = MakeChannel(type, connection, key); return (connection, model); } - private IModel MakeChannel(IConnection connection, string key) + private IModel MakeChannel(ChannelType type, IConnection connection, string key) { var model = connection.CreateModel(); model.CallbackException += (sender, args) => OnException(args, connection, key); model.ModelShutdown += (sender, args) => ConnectionShutdown(args, connection, key); + if (type == ChannelType.Publisher) + { + model.ConfirmSelect(); + } return model; } diff --git a/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs index f78369f..7ded148 100644 --- a/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs +++ b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs @@ -108,12 +108,11 @@ public Task Publish(string topic, Message message) _logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic); - var channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); + var channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Publisher, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); if (channel is null) { throw new NullReferenceException("RabbitMq channel returned null"); } channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); - channel.ConfirmSelect(); var properties = channel.CreateBasicProperties(); properties.Persistent = true; diff --git a/src/Plugins/RabbitMQ/RabbitMQHealthCheck.cs b/src/Plugins/RabbitMQ/RabbitMQHealthCheck.cs index 3e1288c..ee32f6a 100644 --- a/src/Plugins/RabbitMQ/RabbitMQHealthCheck.cs +++ b/src/Plugins/RabbitMQ/RabbitMQHealthCheck.cs @@ -47,6 +47,7 @@ public RabbitMQHealthCheck( try { var channel = _connectionFactory.CreateChannel( + ChannelType.Subscriber, _options[ConfigurationKeys.EndPoint], _options[ConfigurationKeys.Username], _options[ConfigurationKeys.Password], diff --git a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs index d3e389b..fa76617 100644 --- a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs @@ -108,7 +108,7 @@ private void CreateChannel() .Execute(() => { _logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost); - _channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); + _channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); _channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); _channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false); _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); diff --git a/src/Plugins/RabbitMQ/Tests/RabbitMQHealthCheckTest.cs b/src/Plugins/RabbitMQ/Tests/RabbitMQHealthCheckTest.cs index 8ada62d..f4c1829 100644 --- a/src/Plugins/RabbitMQ/Tests/RabbitMQHealthCheckTest.cs +++ b/src/Plugins/RabbitMQ/Tests/RabbitMQHealthCheckTest.cs @@ -42,7 +42,7 @@ public RabbitMQHealthCheckTest() [Fact] public async Task CheckHealthAsync_WhenFailed_ReturnUnhealthy() { - _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Throws(new Exception("error")); var healthCheck = new RabbitMQHealthCheck(_connectionFactory.Object, _options, _logger.Object, (d) => { }); @@ -58,7 +58,7 @@ public async Task CheckHealthAsync_WhenSucceeds_ReturnHealthy() { var channel = new Mock(); channel.Setup(p => p.Close()); - _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(channel.Object); var healthCheck = new RabbitMQHealthCheck(_connectionFactory.Object, _options, _logger.Object, (d) => { }); var results = await healthCheck.CheckHealthAsync(new HealthCheckContext()).ConfigureAwait(false); diff --git a/src/Plugins/RabbitMQ/Tests/RabbitMqMessagePublisherServiceTest.cs b/src/Plugins/RabbitMQ/Tests/RabbitMqMessagePublisherServiceTest.cs index b6a7d5e..6df8927 100644 --- a/src/Plugins/RabbitMQ/Tests/RabbitMqMessagePublisherServiceTest.cs +++ b/src/Plugins/RabbitMQ/Tests/RabbitMqMessagePublisherServiceTest.cs @@ -31,7 +31,7 @@ public class RabbitMQMessagePublisherServiceTest private readonly Mock> _logger; private readonly Mock _connectionFactory; private readonly Mock _model; - private static readonly object mutex = new(); + private static readonly object Mutex = new(); public RabbitMQMessagePublisherServiceTest() { _options = Options.Create(new MessageBrokerServiceConfiguration()); @@ -39,7 +39,7 @@ public RabbitMQMessagePublisherServiceTest() _connectionFactory = new Mock(); _model = new Mock(); - _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(_model.Object); } @@ -117,7 +117,7 @@ public async Task IntegrationTestRabbitPublish() var pubService = new RabbitMQMessagePublisherService(Options.Create(options), new Mock>().Object, connectionFactory); var subService = new RabbitMQMessageSubscriberService(Options.Create(options), new Mock>().Object, connectionFactory); - var count = 10000; + var count = 100; var subRecievedCount = 0; var skipped = 0; @@ -139,7 +139,7 @@ await Task.Run(async () => try { subService.Acknowledge(args.Message); - lock (mutex) + lock (Mutex) subRecievedCount++; } catch (Exception) @@ -163,7 +163,7 @@ await Task.Run(async () => Assert.Equal(HealthStatus.Healthy, result1.Status); } - await Task.Delay(5000); + await Task.Delay(15000); result1 = await hc1.CheckHealthAsync(new HealthCheckContext()); Assert.Equal(HealthStatus.Healthy, result1.Status); @@ -171,7 +171,7 @@ await Task.Run(async () => result1 = await hc2.CheckHealthAsync(new HealthCheckContext()); Assert.Equal(HealthStatus.Healthy, result1.Status); - Assert.Equal((count * 2) -skipped, subRecievedCount); + Assert.Equal((count * 2) - skipped, subRecievedCount); } private async Task PublishMessage(RabbitMQMessagePublisherService pubService, string topic, Message message) diff --git a/src/Plugins/RabbitMQ/Tests/RabbitMqMessageSubscriberServiceTest.cs b/src/Plugins/RabbitMQ/Tests/RabbitMqMessageSubscriberServiceTest.cs index d241323..76e83a5 100644 --- a/src/Plugins/RabbitMQ/Tests/RabbitMqMessageSubscriberServiceTest.cs +++ b/src/Plugins/RabbitMQ/Tests/RabbitMqMessageSubscriberServiceTest.cs @@ -40,7 +40,7 @@ public RabbitMQMessageSubscriberServiceTest() _connectionFactory = new Mock(); _model = new Mock(); - _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(_model.Object); }