Skip to content

Commit

Permalink
Covered new code from #88
Browse files Browse the repository at this point in the history
  • Loading branch information
Kralizek committed Jun 10, 2019
1 parent f1c3945 commit 291d103
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,21 @@ Message GetMessage(BasicDeliverEventArgs args)
[RabbitMqHeaders.MessageId] = args.BasicProperties.MessageId
};

foreach (var header in args.BasicProperties.Headers.Where(k => k.Key.StartsWith("Custom:")))
foreach (var header in args.BasicProperties.Headers)
{
var headerKey = ParseCustom(header.Key);
var value = args.BasicProperties.GetHeader(header.Key, encoding);

message.Headers.Add(headerKey, value);
if (header.Key.StartsWith("Custom:"))
{
var headerKey = ParseCustom(header.Key);
var value = args.BasicProperties.GetHeader(header.Key, encoding);

message.Headers.Add(headerKey, value);
}
else if (header.Key.StartsWith("RabbitMq:"))
{
var value = args.BasicProperties.GetHeader(header.Key, encoding);

message.Headers.Add(header.Key, value);
}
}

return message;
Expand Down Expand Up @@ -228,9 +237,6 @@ public Task SendMessageAsync(Message message)
{
properties.Headers.Add(Custom(header.Key), header.Value);
}

//var headerKey = Headers.IsNybus(header.Key) ? Nybus(header.Key) : Custom(header.Key);
//properties.Headers.Add(headerKey, header.Value);
}

var exchangeName = MessageDescriptor.CreateFromType(type);
Expand Down
192 changes: 192 additions & 0 deletions tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,18 @@ public async Task Commands_can_be_sent([Frozen] IRabbitMqConfiguration configura
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(bp => bp.Headers.ContainsKey($"Custom:{headerKey}") && (string)bp.Headers[$"Custom:{headerKey}"] == headerValue), It.IsAny<byte[]>()));
}

[Test, CustomAutoMoqData]
public async Task Arbitrary_RabbitMq_headers_are_forwarded_when_sending_commands([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, CommandMessage<FirstTestCommand> message, string headerKey, string headerValue)
{
message.Headers[$"RabbitMq:{headerKey}"] = headerValue;

await sut.StartAsync();

await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(bp => bp.Headers.ContainsKey($"RabbitMq:{headerKey}") && (string)bp.Headers[$"RabbitMq:{headerKey}"] == headerValue), It.IsAny<byte[]>()));
}

[Test, CustomAutoMoqData]
public async Task Arbitrary_headers_are_forwarded_when_sending_commands([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, CommandMessage<FirstTestCommand> message, string headerKey, string headerValue)
{
Expand Down Expand Up @@ -573,6 +585,18 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_events([Frozen] I
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(o => o.Headers.ContainsKey($"Custom:{headerKey}")), It.IsAny<byte[]>()));
}

[Test, CustomAutoMoqData]
public async Task Arbitrary_RabbitMq_headers_are_forwarded_when_sending_events([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, EventMessage<FirstTestEvent> message, string headerKey, string headerValue)
{
message.Headers[$"RabbitMq:{headerKey}"] = headerValue;

await sut.StartAsync().ConfigureAwait(false);

await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(bp => bp.Headers.ContainsKey($"RabbitMq:{headerKey}") && (string)bp.Headers[$"RabbitMq:{headerKey}"] == headerValue), It.IsAny<byte[]>()));
}

[Test, CustomAutoMoqData]
public async Task NotifySuccess_acks_command_messages([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
{
Expand Down Expand Up @@ -760,5 +784,173 @@ public async Task No_QoS_is_sent_if_no_value_is_set([Frozen] IRabbitMqConfigurat

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicQos(It.IsAny<uint>(), It.IsAny<ushort>(), It.IsAny<bool>()), Times.Never);
}

[Test, CustomAutoMoqData]
public async Task Custom_headers_are_read_from_incoming_message([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent Event, string headerKey, string headerValue)
{
sut.SubscribeToEvent<FirstTestEvent>();

var sequence = await sut.StartAsync();

var encoding = Encoding.UTF8;

IBasicProperties properties = new BasicProperties
{
MessageId = messageId,
ContentEncoding = encoding.WebName,
Headers = new Dictionary<string, object>
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(Event.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray(),
[$"Custom:{headerKey}"] = headerValue
}
};

var body = configuration.Serializer.SerializeObject(Event, encoding);

var incomingMessages = sequence.DumpInList();

sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Assert.That(incomingMessages, Has.Exactly(1).InstanceOf<EventMessage<FirstTestEvent>>());

var message = incomingMessages[0] as EventMessage<FirstTestEvent>;

Assert.That(message, Is.Not.Null);
Assert.That(message.MessageId, Is.EqualTo(messageId));
Assert.That(message.MessageType, Is.EqualTo(MessageType.Event));
Assert.That(message.Type, Is.EqualTo(Event.GetType()));
Assert.That(message.Event, Is.Not.Null);

Assert.That(message.Headers, Contains.Key(headerKey));
Assert.That(message.Headers[headerKey], Is.EqualTo(headerValue));
}

[Test, CustomAutoMoqData]
public async Task Custom_headers_are_read_from_incoming_message([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand Command, string headerKey, string headerValue)
{
sut.SubscribeToCommand<FirstTestCommand>();

var sequence = await sut.StartAsync();

var encoding = Encoding.UTF8;

IBasicProperties properties = new BasicProperties
{
MessageId = messageId,
ContentEncoding = encoding.WebName,
Headers = new Dictionary<string, object>
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(Command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray(),
[$"Custom:{headerKey}"] = headerValue
}
};

var body = configuration.Serializer.SerializeObject(Command, encoding);

var incomingMessages = sequence.DumpInList();

sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Assert.That(incomingMessages, Has.Exactly(1).InstanceOf<CommandMessage<FirstTestCommand>>());

var message = incomingMessages[0] as CommandMessage<FirstTestCommand>;

Assert.That(message, Is.Not.Null);
Assert.That(message.MessageId, Is.EqualTo(messageId));
Assert.That(message.MessageType, Is.EqualTo(MessageType.Command));
Assert.That(message.Type, Is.EqualTo(Command.GetType()));
Assert.That(message.Command, Is.Not.Null);

Assert.That(message.Headers, Contains.Key(headerKey));
Assert.That(message.Headers[headerKey], Is.EqualTo(headerValue));
}

[Test, CustomAutoMoqData]
public async Task RabbitMq_headers_are_read_from_incoming_message([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent Event, string headerKey, string headerValue)
{
sut.SubscribeToEvent<FirstTestEvent>();

var sequence = await sut.StartAsync();

var encoding = Encoding.UTF8;

IBasicProperties properties = new BasicProperties
{
MessageId = messageId,
ContentEncoding = encoding.WebName,
Headers = new Dictionary<string, object>
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(Event.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray(),
[$"RabbitMq:{headerKey}"] = headerValue
}
};

var body = configuration.Serializer.SerializeObject(Event, encoding);

var incomingMessages = sequence.DumpInList();

sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Assert.That(incomingMessages, Has.Exactly(1).InstanceOf<EventMessage<FirstTestEvent>>());

var message = incomingMessages[0] as EventMessage<FirstTestEvent>;

Assert.That(message, Is.Not.Null);
Assert.That(message.MessageId, Is.EqualTo(messageId));
Assert.That(message.MessageType, Is.EqualTo(MessageType.Event));
Assert.That(message.Type, Is.EqualTo(Event.GetType()));
Assert.That(message.Event, Is.Not.Null);

Assert.That(message.Headers, Contains.Key($"RabbitMq:{headerKey}"));
Assert.That(message.Headers[$"RabbitMq:{headerKey}"], Is.EqualTo(headerValue));
}

[Test, CustomAutoMoqData]
public async Task RabbitMq_headers_are_read_from_incoming_message([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand Command, string headerKey, string headerValue)
{
sut.SubscribeToCommand<FirstTestCommand>();

var sequence = await sut.StartAsync();

var encoding = Encoding.UTF8;

IBasicProperties properties = new BasicProperties
{
MessageId = messageId,
ContentEncoding = encoding.WebName,
Headers = new Dictionary<string, object>
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(Command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray(),
[$"RabbitMq:{headerKey}"] = headerValue
}
};

var body = configuration.Serializer.SerializeObject(Command, encoding);

var incomingMessages = sequence.DumpInList();

sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Assert.That(incomingMessages, Has.Exactly(1).InstanceOf<CommandMessage<FirstTestCommand>>());

var message = incomingMessages[0] as CommandMessage<FirstTestCommand>;

Assert.That(message, Is.Not.Null);
Assert.That(message.MessageId, Is.EqualTo(messageId));
Assert.That(message.MessageType, Is.EqualTo(MessageType.Command));
Assert.That(message.Type, Is.EqualTo(Command.GetType()));
Assert.That(message.Command, Is.Not.Null);

Assert.That(message.Headers, Contains.Key($"RabbitMq:{headerKey}"));
Assert.That(message.Headers[$"RabbitMq:{headerKey}"], Is.EqualTo(headerValue));
}
}
}

0 comments on commit 291d103

Please sign in to comment.