Skip to content

Commit 525b1d1

Browse files
authored
Merge pull request #127 from Project-MONAI/nds-hunting-rabbit
Nds hunting rabbit
2 parents b71d281 + e0bc709 commit 525b1d1

9 files changed

+49
-21
lines changed

src/Plugins/RabbitMQ/ChannelType.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2021-2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
namespace Monai.Deploy.Messaging.RabbitMQ
18+
{
19+
public enum ChannelType
20+
{
21+
Subscriber,
22+
Publisher
23+
}
24+
}

src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ public interface IRabbitMQConnectionFactory
3232
/// <param name="useSSL">Encrypt communication</param>
3333
/// <param name="portNumber">Port Number</param>
3434
/// <returns>Instance of <see cref="IModel"/>.</returns>
35-
IModel? CreateChannel( string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);
35+
IModel? CreateChannel( ChannelType type, string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);
3636
}
3737
}

src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@ public RabbitMQConnectionFactory(ILogger<RabbitMQConnectionFactory> logger)
4141
}
4242

4343

44-
public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
44+
public IModel CreateChannel(ChannelType type, string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
4545
{
4646
Guard.Against.NullOrWhiteSpace(hostName);
4747
Guard.Against.NullOrWhiteSpace(username);
4848
Guard.Against.NullOrWhiteSpace(password);
4949
Guard.Against.NullOrWhiteSpace(virtualHost);
5050

51-
var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}";
51+
var key = $"{type}{hostName}{username}{HashPassword(password)}{virtualHost}";
5252

5353
var connection = _connections.AddOrUpdate(key,
54-
x => MakeConnection(hostName, username, password, virtualHost, key, useSSL, portNumber),
54+
x => MakeConnection(type, hostName, username, password, virtualHost, key, useSSL, portNumber),
5555
(updateKey, updateConnection) =>
5656
{
5757
// If connection to RMQ is lost and:
@@ -62,13 +62,13 @@ public IModel CreateChannel(string hostName, string username, string password, s
6262
{
6363
if (updateConnection.model.IsClosed)
6464
{
65-
updateConnection.model = MakeChannel(updateConnection.connection, key);
65+
updateConnection.model = MakeChannel(type, updateConnection.connection, key);
6666
}
6767
return updateConnection;
6868
}
6969
else
7070
{
71-
return MakeConnection(hostName, username, password, virtualHost, key, useSSL, portNumber);
71+
return MakeConnection(type, hostName, username, password, virtualHost, key, useSSL, portNumber);
7272
}
7373
});
7474

@@ -87,18 +87,22 @@ private void OnException(CallbackExceptionEventArgs args, IConnection value, str
8787

8888
}
8989

90-
private (IConnection, IModel) MakeConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
90+
private (IConnection, IModel) MakeConnection(ChannelType type, string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
9191
{
9292
var connection = CreateConnectionOnly(hostName, username, password, virtualHost, key, useSSL, portNumber);
93-
var model = MakeChannel(connection, key);
93+
var model = MakeChannel(type, connection, key);
9494
return (connection, model);
9595
}
9696

97-
private IModel MakeChannel(IConnection connection, string key)
97+
private IModel MakeChannel(ChannelType type, IConnection connection, string key)
9898
{
9999
var model = connection.CreateModel();
100100
model.CallbackException += (sender, args) => OnException(args, connection, key);
101101
model.ModelShutdown += (sender, args) => ConnectionShutdown(args, connection, key);
102+
if (type == ChannelType.Publisher)
103+
{
104+
model.ConfirmSelect();
105+
}
102106
return model;
103107
}
104108

src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,11 @@ public Task Publish(string topic, Message message)
108108

109109
_logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic);
110110

111-
var channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
111+
var channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Publisher, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
112112

113113
if (channel is null) { throw new NullReferenceException("RabbitMq channel returned null"); }
114114

115115
channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
116-
channel.ConfirmSelect();
117116

118117
var properties = channel.CreateBasicProperties();
119118
properties.Persistent = true;

src/Plugins/RabbitMQ/RabbitMQHealthCheck.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public RabbitMQHealthCheck(
4747
try
4848
{
4949
var channel = _connectionFactory.CreateChannel(
50+
ChannelType.Subscriber,
5051
_options[ConfigurationKeys.EndPoint],
5152
_options[ConfigurationKeys.Username],
5253
_options[ConfigurationKeys.Password],

src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private void CreateChannel()
108108
.Execute(() =>
109109
{
110110
_logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost);
111-
_channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
111+
_channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
112112
_channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
113113
_channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false);
114114
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

src/Plugins/RabbitMQ/Tests/RabbitMQHealthCheckTest.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public RabbitMQHealthCheckTest()
4242
[Fact]
4343
public async Task CheckHealthAsync_WhenFailed_ReturnUnhealthy()
4444
{
45-
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
45+
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<ChannelType>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
4646
.Throws(new Exception("error"));
4747

4848
var healthCheck = new RabbitMQHealthCheck(_connectionFactory.Object, _options, _logger.Object, (d) => { });
@@ -58,7 +58,7 @@ public async Task CheckHealthAsync_WhenSucceeds_ReturnHealthy()
5858
{
5959
var channel = new Mock<IModel>();
6060
channel.Setup(p => p.Close());
61-
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
61+
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<ChannelType>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
6262
.Returns(channel.Object);
6363
var healthCheck = new RabbitMQHealthCheck(_connectionFactory.Object, _options, _logger.Object, (d) => { });
6464
var results = await healthCheck.CheckHealthAsync(new HealthCheckContext()).ConfigureAwait(false);

src/Plugins/RabbitMQ/Tests/RabbitMqMessagePublisherServiceTest.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ public class RabbitMQMessagePublisherServiceTest
3131
private readonly Mock<ILogger<RabbitMQMessagePublisherService>> _logger;
3232
private readonly Mock<IRabbitMQConnectionFactory> _connectionFactory;
3333
private readonly Mock<IModel> _model;
34-
private static readonly object mutex = new();
34+
private static readonly object Mutex = new();
3535
public RabbitMQMessagePublisherServiceTest()
3636
{
3737
_options = Options.Create(new MessageBrokerServiceConfiguration());
3838
_logger = new Mock<ILogger<RabbitMQMessagePublisherService>>();
3939
_connectionFactory = new Mock<IRabbitMQConnectionFactory>();
4040
_model = new Mock<IModel>();
4141

42-
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
42+
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<ChannelType>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
4343
.Returns(_model.Object);
4444
}
4545

@@ -117,7 +117,7 @@ public async Task IntegrationTestRabbitPublish()
117117
var pubService = new RabbitMQMessagePublisherService(Options.Create(options), new Mock<ILogger<RabbitMQMessagePublisherService>>().Object, connectionFactory);
118118
var subService = new RabbitMQMessageSubscriberService(Options.Create(options), new Mock<ILogger<RabbitMQMessageSubscriberService>>().Object, connectionFactory);
119119

120-
var count = 10000;
120+
var count = 100;
121121
var subRecievedCount = 0;
122122
var skipped = 0;
123123

@@ -139,7 +139,7 @@ await Task.Run(async () =>
139139
try
140140
{
141141
subService.Acknowledge(args.Message);
142-
lock (mutex)
142+
lock (Mutex)
143143
subRecievedCount++;
144144
}
145145
catch (Exception)
@@ -163,15 +163,15 @@ await Task.Run(async () =>
163163
Assert.Equal(HealthStatus.Healthy, result1.Status);
164164
}
165165

166-
await Task.Delay(5000);
166+
await Task.Delay(15000);
167167

168168
result1 = await hc1.CheckHealthAsync(new HealthCheckContext());
169169
Assert.Equal(HealthStatus.Healthy, result1.Status);
170170

171171
result1 = await hc2.CheckHealthAsync(new HealthCheckContext());
172172
Assert.Equal(HealthStatus.Healthy, result1.Status);
173173

174-
Assert.Equal((count * 2) -skipped, subRecievedCount);
174+
Assert.Equal((count * 2) - skipped, subRecievedCount);
175175

176176
}
177177
private async Task<int> PublishMessage(RabbitMQMessagePublisherService pubService, string topic, Message message)

src/Plugins/RabbitMQ/Tests/RabbitMqMessageSubscriberServiceTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public RabbitMQMessageSubscriberServiceTest()
4040
_connectionFactory = new Mock<IRabbitMQConnectionFactory>();
4141
_model = new Mock<IModel>();
4242

43-
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
43+
_connectionFactory.Setup(p => p.CreateChannel(It.IsAny<ChannelType>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
4444
.Returns(_model.Object);
4545
}
4646

0 commit comments

Comments
 (0)