Skip to content

Commit 42a89ba

Browse files
authored
Merge pull request #182 from Project-MONAI/ac-1619-nds
Ac 1619 nds
2 parents af25f2d + 41e1f4c commit 42a89ba

File tree

2 files changed

+249
-39
lines changed

2 files changed

+249
-39
lines changed

src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -266,46 +266,7 @@ public void SubscribeAsync(string[] topics, string queue, Func<MessageReceivedEv
266266
{
267267
var deadLetterQueueDeclareResult = _channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false);
268268
BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterQueueDeclareResult.QueueName);
269-
270-
var consumer = new EventingBasicConsumer(_channel);
271-
consumer.Received += async (model, eventArgs) =>
272-
{
273-
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
274-
{
275-
["MessageId"] = eventArgs.BasicProperties.MessageId,
276-
["ApplicationId"] = eventArgs.BasicProperties.AppId,
277-
["CorrelationId"] = eventArgs.BasicProperties.CorrelationId,
278-
["RecievedTime"] = DateTime.UtcNow
279-
});
280-
281-
_logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey);
282-
283-
MessageReceivedEventArgs messageReceivedEventArgs;
284-
try
285-
{
286-
messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs);
287-
}
288-
catch (Exception ex)
289-
{
290-
_logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex);
291-
292-
_logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId);
293-
_channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false);
294-
_logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false);
295-
return;
296-
}
297-
try
298-
{
299-
await messageReceivedCallback(messageReceivedEventArgs).ConfigureAwait(false);
300-
}
301-
catch (Exception ex)
302-
{
303-
_logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex);
304-
}
305-
};
306269
_channel.BasicQos(0, prefetchCount, false);
307-
_channel.BasicConsume(queueDeclareResult.QueueName, false, consumer);
308-
_logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics));
309270
}
310271
catch (OperationInterruptedException operationInterruptedException)
311272
{
@@ -320,6 +281,45 @@ public void SubscribeAsync(string[] topics, string queue, Func<MessageReceivedEv
320281
throw;
321282
}
322283
}
284+
var consumer = new EventingBasicConsumer(_channel);
285+
consumer.Received += async (model, eventArgs) =>
286+
{
287+
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
288+
{
289+
["MessageId"] = eventArgs.BasicProperties.MessageId,
290+
["ApplicationId"] = eventArgs.BasicProperties.AppId,
291+
["CorrelationId"] = eventArgs.BasicProperties.CorrelationId,
292+
["RecievedTime"] = DateTime.UtcNow
293+
});
294+
295+
_logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey);
296+
297+
MessageReceivedEventArgs messageReceivedEventArgs;
298+
try
299+
{
300+
messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs);
301+
}
302+
catch (Exception ex)
303+
{
304+
_logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex);
305+
306+
_logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId);
307+
_channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false);
308+
_logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false);
309+
return;
310+
}
311+
try
312+
{
313+
await messageReceivedCallback(messageReceivedEventArgs).ConfigureAwait(false);
314+
}
315+
catch (Exception ex)
316+
{
317+
_logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex);
318+
}
319+
};
320+
321+
_channel.BasicConsume(queueDeclareResult.QueueName, false, consumer);
322+
_logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics));
323323
}
324324

325325
public void Acknowledge(MessageBase message)

src/Plugins/RabbitMQ/Tests/Unit/RabbitMqMessageSubscriberServiceTest.cs

100644100755
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using Monai.Deploy.Messaging.Messages;
2323
using Moq;
2424
using RabbitMQ.Client;
25+
using RabbitMQ.Client.Exceptions;
2526
using Xunit;
2627

2728
namespace Monai.Deploy.Messaging.RabbitMQ.Tests.Unit
@@ -165,6 +166,215 @@ await Task.Run(() =>
165166
Assert.Equal(message.Body, s_messageReceived.Body);
166167
}
167168

169+
[Fact(DisplayName = "Subscribes to a topic and previously created dead-letter queue is down")]
170+
public void SubscribesToATopicAndDeadLetterQueueIsDown()
171+
{
172+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint");
173+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username");
174+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password");
175+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host");
176+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange");
177+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.DeadLetterExchange, "exchange-dead-letter");
178+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.DeliveryLimit, "3");
179+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.RequeueDelay, "30");
180+
181+
var jsonMessage = new JsonMessage<string>("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "1");
182+
var message = jsonMessage.ToMessage();
183+
184+
var creationTime = DateTimeOffset.FromUnixTimeSeconds(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
185+
186+
var basicProperties = new Mock<IBasicProperties>();
187+
basicProperties.SetupGet(p => p.MessageId).Returns(jsonMessage.MessageId);
188+
basicProperties.SetupGet(p => p.AppId).Returns(jsonMessage.ApplicationId);
189+
basicProperties.SetupGet(p => p.ContentType).Returns(jsonMessage.ContentType);
190+
basicProperties.SetupGet(p => p.CorrelationId).Returns(jsonMessage.CorrelationId);
191+
basicProperties.SetupGet(p => p.Headers["CreationDateTime"]).Returns(Encoding.UTF8.GetBytes(creationTime.ToString("o", CultureInfo.InvariantCulture)));
192+
193+
basicProperties.SetupGet(p => p.Type).Returns("topic");
194+
basicProperties.SetupGet(p => p.Timestamp).Returns(new AmqpTimestamp(creationTime.ToUnixTimeSeconds()));
195+
196+
_model.Setup(p => p.QueueDeclare(
197+
"queue",
198+
It.IsAny<bool>(),
199+
It.IsAny<bool>(),
200+
It.IsAny<bool>(),
201+
It.IsAny<IDictionary<string, object>>()))
202+
.Returns(new QueueDeclareOk("queue-name", 1, 1));
203+
204+
var exception = new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Peer, 404,
205+
"NOT_FOUND - home node '[email protected]' of durable queue 'md.workflow.request-dead-letter' in vhost 'monaideploy' is down or inaccessible"));
206+
207+
_model.Setup(p => p.QueueDeclare(
208+
"queue-dead-letter",
209+
It.IsAny<bool>(),
210+
It.IsAny<bool>(),
211+
It.IsAny<bool>(),
212+
It.IsAny<IDictionary<string, object>>()))
213+
.Throws(exception);
214+
215+
_model.Setup(p => p.QueueBind(
216+
It.IsAny<string>(),
217+
It.IsAny<string>(),
218+
It.IsAny<string>(),
219+
It.IsAny<IDictionary<string, object>>()));
220+
_model.Setup(p => p.BasicQos(
221+
It.IsAny<uint>(),
222+
It.IsAny<ushort>(),
223+
It.IsAny<bool>()));
224+
_model.Setup(p => p.BasicConsume(
225+
It.IsAny<string>(),
226+
It.IsAny<bool>(),
227+
It.IsAny<string>(),
228+
It.IsAny<bool>(),
229+
It.IsAny<bool>(),
230+
It.IsAny<IDictionary<string, object>>(),
231+
It.IsAny<IBasicConsumer>()))
232+
.Callback<string, bool, string, bool, bool, IDictionary<string, object>, IBasicConsumer>(
233+
(queue, autoAck, tag, noLocal, exclusive, args, consumer) =>
234+
{
235+
consumer.HandleBasicDeliver(tag, Convert.ToUInt64(jsonMessage.DeliveryTag, CultureInfo.InvariantCulture), false, "exchange", "topic", basicProperties.Object, new ReadOnlyMemory<byte>(message.Body));
236+
});
237+
238+
var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);
239+
240+
service.Subscribe("topic", "queue", (args) =>
241+
{
242+
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
243+
Assert.Equal(message.ContentType, args.Message.ContentType);
244+
Assert.Equal(message.MessageId, args.Message.MessageId);
245+
Assert.Equal(creationTime.ToUniversalTime(), args.Message.CreationDateTime.ToUniversalTime());
246+
Assert.Equal(message.DeliveryTag, args.Message.DeliveryTag);
247+
Assert.Equal("topic", args.Message.MessageDescription);
248+
Assert.Equal(message.MessageId, args.Message.MessageId);
249+
Assert.Equal(message.Body, args.Message.Body);
250+
});
251+
252+
service.SubscribeAsync("topic", "queue", async (args) =>
253+
{
254+
await Task.Run(() =>
255+
{
256+
s_messageReceived = args.Message;
257+
service.Acknowledge(args.Message);
258+
}).ConfigureAwait(false);
259+
});
260+
261+
// wait for it to pick up meassage
262+
Task.Delay(500).GetAwaiter().GetResult();
263+
264+
Assert.NotNull(s_messageReceived);
265+
Assert.Equal(message.ApplicationId, s_messageReceived.ApplicationId);
266+
Assert.Equal(message.ContentType, s_messageReceived.ContentType);
267+
Assert.Equal(message.MessageId, s_messageReceived.MessageId);
268+
Assert.Equal(creationTime.ToUniversalTime(), s_messageReceived.CreationDateTime.ToUniversalTime());
269+
Assert.Equal(message.DeliveryTag, s_messageReceived.DeliveryTag);
270+
Assert.Equal("topic", s_messageReceived.MessageDescription);
271+
Assert.Equal(message.MessageId, s_messageReceived.MessageId);
272+
Assert.Equal(message.Body, s_messageReceived.Body);
273+
}
274+
275+
[Fact(DisplayName = "Subscribes to a topic and dead-letter queue subscription fails with generic exception")]
276+
public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericException()
277+
{
278+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint");
279+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username");
280+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password");
281+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host");
282+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange");
283+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.DeadLetterExchange, "exchange-dead-letter");
284+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.DeliveryLimit, "3");
285+
_options.Value.SubscriberSettings.Add(ConfigurationKeys.RequeueDelay, "30");
286+
287+
var jsonMessage = new JsonMessage<string>("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "1");
288+
var message = jsonMessage.ToMessage();
289+
290+
var creationTime = DateTimeOffset.FromUnixTimeSeconds(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
291+
292+
var basicProperties = new Mock<IBasicProperties>();
293+
basicProperties.SetupGet(p => p.MessageId).Returns(jsonMessage.MessageId);
294+
basicProperties.SetupGet(p => p.AppId).Returns(jsonMessage.ApplicationId);
295+
basicProperties.SetupGet(p => p.ContentType).Returns(jsonMessage.ContentType);
296+
basicProperties.SetupGet(p => p.CorrelationId).Returns(jsonMessage.CorrelationId);
297+
basicProperties.SetupGet(p => p.Headers["CreationDateTime"]).Returns(Encoding.UTF8.GetBytes(creationTime.ToString("o", CultureInfo.InvariantCulture)));
298+
299+
basicProperties.SetupGet(p => p.Type).Returns("topic");
300+
basicProperties.SetupGet(p => p.Timestamp).Returns(new AmqpTimestamp(creationTime.ToUnixTimeSeconds()));
301+
302+
_model.Setup(p => p.QueueDeclare(
303+
"queue",
304+
It.IsAny<bool>(),
305+
It.IsAny<bool>(),
306+
It.IsAny<bool>(),
307+
It.IsAny<IDictionary<string, object>>()))
308+
.Returns(new QueueDeclareOk("queue-name", 1, 1));
309+
310+
var exception = new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Application, 500,
311+
"Something else went wrong"));
312+
313+
_model.Setup(p => p.QueueDeclare(
314+
"queue-dead-letter",
315+
It.IsAny<bool>(),
316+
It.IsAny<bool>(),
317+
It.IsAny<bool>(),
318+
It.IsAny<IDictionary<string, object>>()))
319+
.Throws(exception);
320+
321+
_model.Setup(p => p.QueueBind(
322+
It.IsAny<string>(),
323+
It.IsAny<string>(),
324+
It.IsAny<string>(),
325+
It.IsAny<IDictionary<string, object>>()));
326+
_model.Setup(p => p.BasicQos(
327+
It.IsAny<uint>(),
328+
It.IsAny<ushort>(),
329+
It.IsAny<bool>()));
330+
_model.Setup(p => p.BasicConsume(
331+
It.IsAny<string>(),
332+
It.IsAny<bool>(),
333+
It.IsAny<string>(),
334+
It.IsAny<bool>(),
335+
It.IsAny<bool>(),
336+
It.IsAny<IDictionary<string, object>>(),
337+
It.IsAny<IBasicConsumer>()))
338+
.Callback<string, bool, string, bool, bool, IDictionary<string, object>, IBasicConsumer>(
339+
(queue, autoAck, tag, noLocal, exclusive, args, consumer) =>
340+
{
341+
consumer.HandleBasicDeliver(tag, Convert.ToUInt64(jsonMessage.DeliveryTag, CultureInfo.InvariantCulture), false, "exchange", "topic", basicProperties.Object, new ReadOnlyMemory<byte>(message.Body));
342+
});
343+
344+
var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);
345+
346+
var act = () =>
347+
{
348+
service.Subscribe("topic", "queue", (args) =>
349+
{
350+
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
351+
Assert.Equal(message.ContentType, args.Message.ContentType);
352+
Assert.Equal(message.MessageId, args.Message.MessageId);
353+
Assert.Equal(creationTime.ToUniversalTime(), args.Message.CreationDateTime.ToUniversalTime());
354+
Assert.Equal(message.DeliveryTag, args.Message.DeliveryTag);
355+
Assert.Equal("topic", args.Message.MessageDescription);
356+
Assert.Equal(message.MessageId, args.Message.MessageId);
357+
Assert.Equal(message.Body, args.Message.Body);
358+
359+
});
360+
};
361+
Assert.Throws<OperationInterruptedException>(act);
362+
363+
var asyncAct = () =>
364+
{
365+
service.SubscribeAsync("topic", "queue", async (args) =>
366+
{
367+
await Task.Run(() =>
368+
{
369+
s_messageReceived = args.Message;
370+
service.Acknowledge(args.Message);
371+
}).ConfigureAwait(false);
372+
373+
});
374+
};
375+
Assert.Throws<OperationInterruptedException>(asyncAct);
376+
}
377+
168378
[Fact(DisplayName = "Acknowledge a message")]
169379
public void AcknowledgeAMessage()
170380
{

0 commit comments

Comments
 (0)