Skip to content

Commit

Permalink
Support for Dead Letter Queues (#2)
Browse files Browse the repository at this point in the history
* add dead letter queue support

* update docs

---------

Co-authored-by: Piotr Rojek <[email protected]>
  • Loading branch information
piotr-rojek and dra9ula authored Feb 6, 2023
1 parent 5191f3e commit e22dcbe
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 31 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

Emulator is really an AMQP 1.0 proxy that translates Azure Service Bus specifics into backend implementation specifics. Backend is an actual queue implementation that runs locally, not in cloud, and handles the messages. The main use case is to help with local development experience and with integration testing where application code cannot change driver easily.

See instructions for running from [Docker](docs/install-docker.md), running from [Package](docs/install-package.md), and [here](docs/examples.md) examples of client implementations.
See instructions for
* [Running from Docker](docs/install-docker.md),
* [Running from Package](docs/install-package.md),
* [Configuration](docs/configuration.md),
* [Examples of client implementations](docs/examples.md).

![](docs/example-azure-function.gif)

Expand Down Expand Up @@ -34,11 +38,10 @@ Current implementation is considered experimental, but from initial tests it wor
* Sending, Receiving, Peeking messages, including metadata
* Message Annotations: TTL, ID, Header, ApplicationData, and more
* Queues, Topics and Subscriptions
* Dead Letter Queues
* Renew Locks (dummy)

## Not Supported Features

* Dead Letter (not implemented yet)
* Message Annotations: partitions, groups, reply-to and more (ignored)
* Sessions, Transactions
* Scheduled and Delayed messages
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.integration.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: "3.9"
services:
emulator:
sbemulator:
environment:
- Emulator__QueuesAndTopics=test-queue;test-topic/Subscriptions/test-sub1;test-topic/Subscriptions/test-sub2

Expand All @@ -10,4 +10,4 @@ services:
context: .
dockerfile: ./test/ServiceBusEmulator.IntegrationTests/Dockerfile
depends_on:
- emulator
- sbemulator
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
timeout: 30s
retries: 60

emulator:
sbemulator:
image: devopsifyme/sbemu:latest
build:
context: .
Expand Down
28 changes: 28 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Configuration

Environmental Variable | Docker Default | Description
-|-|-
Emulator__QueuesAndTopics | | Coma-separated list of queues and topics to be created on startup. See below
Emulator__Port | 5671 | Port on which service is listening
Emulator__ServerCertificatePath | /app/cert.pfx | Path to a PFX file containing emulator's TLS certificate
Emulator__ServerCertificatePassword | password | Password for the PFX file
Emulator__ServerCertificateThumbprint | | Server certificate in current user's certificate store
Emulator__RabbitMq__Username | guest | RabbitMQ backend - username
Emulator__RabbitMq__Password | guest | RabbitMQ backend - password
Emulator__RabbitMq__Host | localhost | RabbitMQ backend - hostname
Emulator__RabbitMq__Port | 5672 | RabbitMQ backend - port

## Emulator.QueuesAndTopics

You can use this to preinitialize queues and topics, if your tests require this. Otherwise some messages sent might be lost, if the receiver connects after they are sent. For example, integration tests in this repo rely on this setting (see docker-compose.integration.yml).

Sample value
```
queue-name;topic-name/Subscriptions/sub1;topic-name/Subscriptions/sub2
```

## Bring your own certificate

If you don't want to trust provided test CA, you can provide your own server certificate. Mount your PFX file in any directory, and update environmental variables `Emulator__ServerCertificatePath` and `Emulator__ServerCertificatePassword`.

> Note that the provided server certificate is regenerated each release, so it is not convenient to trust it. Therefore by default it is required to trust the Test CA from this repository. Since this certificate is publicly available here, it could potentially allow attacker to generate any certificate that you would trust.
18 changes: 17 additions & 1 deletion docs/install-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ Note that the SharedAccess values must be exactly as specified as this is curren

> "Endpoint=sb://localhost/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false"
Emulator also supports following hostnames:
Emulator also supports following hostnames with the provided certificate:
* sb://localhost (for local dev)
* sb://sbemulator (for usage with docker compose, etc.)
* sb://emulator (for usage with docker compose, etc.)
* sb://devopsifyme-local.servicebus.windows.net (for strict SDK requiring specific host, override in /etc/host)

## Run from DockerHub
Expand Down Expand Up @@ -40,3 +41,18 @@ docker compose up --detach
Import-Certificate -FilePath "docker\rabbitmq-amqp1\testca\cacert.cer" -CertStoreLocation cert:\CurrentUser\Root
```

## Run automated integration tests

We use docker compose for running integration tests that are written in XUnit. At the end technology choice is not that important, as the approach.

1) docker-compose.yml to define RabbitMQ backend and Emulator services - used during development
2) docker-compose.integration.yml
* Additional emulator configuration specific to the test
* Build docker image for the test runner of your choosing
3) Run `docker compose up` targetting `integration` service, which runs our tests
* It also starts Rabbit and Emulator because of the dependencies
* If `integration` container exits with code <> 0 => our tests have failed

```
docker compose -f .\docker-compose.yml -f .\docker-compose.integration.yml up --build integration
```
4 changes: 2 additions & 2 deletions src/ServiceBusEmulator.Host/appsettings.Docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"ServerCertificatePassword": "password",
"Port": 5671,
"RabbitMq": {
"Username": "user",
"Password": "password",
"Username": "guest",
"Password": "guest",
"Host": "localhost",
"Port": 5672
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public PeekMessageCommand(IRabbitMqUtilities utilities, IRabbitMqMapper mapper)
public (Message, AmqpResponseStatusCode) Handle(Message request, IModel channel, string address)
{

(_, string? queueName, _) = _utilities.GetExachangeAndQueue(address);
(_, string? queueName, _) = _utilities.GetExachangeAndQueueNames(address);

Map requestBody = (Map)request.Body;
int fromSequence = Convert.ToInt32(requestBody[ManagementConstants.Properties.FromSequenceNumber]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public override void SetContext(IModel channel, Source source, ReceiverSettleMod
{
Channel = channel;
ReceiveSettleMode = rcvSettleMode;
(_, QueueName, _) = _utilities.GetExachangeAndQueue(source.Address);
(_, QueueName, _) = _utilities.GetExachangeAndQueueNames(source.Address);
}

public override void OnLinkClosed(ListenerLink link, Error error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override void SetContext(IModel channel, Target target)
{
Channel = channel;
Target = target;
(ExchangeName, QueueName, RoutingKey) = _utilities.GetExachangeAndQueue(Target.Address);
(ExchangeName, QueueName, RoutingKey) = _utilities.GetExachangeAndQueueNames(Target.Address);
}

public override void OnLinkClosed(ListenerLink link, Error error)
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceBusEmulator.RabbitMq/IRabbitMqUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace ServiceBusEmulator.RabbitMq
public interface IRabbitMqUtilities
{
void EnsureExists(IModel channel, string address, bool isSender = false);
(string exchange, string queue, string routingKey) GetExachangeAndQueue(string address);
(string exchange, string queue, string routingKey) GetExachangeAndQueueNames(string address);
}
}
46 changes: 37 additions & 9 deletions src/ServiceBusEmulator.RabbitMq/RabbitMqUtilities.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client;

namespace ServiceBusEmulator.RabbitMq
{
public class RabbitMqUtilities : IRabbitMqUtilities
{
public (string exchange, string queue, string routingKey) GetExachangeAndQueue(string address)
public (string exchange, string queue, string routingKey) GetExachangeAndQueueNames(string address)
{
if (address.StartsWith("/"))
{
Expand All @@ -14,22 +13,51 @@ public class RabbitMqUtilities : IRabbitMqUtilities

string[] parts = address.Split('/');
string exchangeName = parts[0];
string routingKey = string.Empty;

string routingKey = parts[0];
string queueName = address.Contains("/Subscriptions/") ? $"{parts[0]}-sub-{parts[2]}" : parts[0];

if (address.EndsWith("$deadletterqueue"))
{
(string? dlqQueue, string? dlqRoutingKey) = GetDlqQueueNames(queueName);
queueName = dlqQueue ?? queueName;
routingKey = dlqRoutingKey ?? routingKey;
}

return (exchangeName, queueName, routingKey);
}

protected (string? dlqQueue, string? dlqRoutingKey) GetDlqQueueNames(string queue)
{
if(queue.EndsWith("dlq"))
{
return (null, null);
}

return ($"{queue}-dlq", $"{queue}-dlq");
}

public void EnsureExists(IModel channel, string address, bool isSender = false)
{
(string? exchange, string? queue, string? routingKey) = GetExachangeAndQueue(address);
(string exchange, string queue, string routingKey) = GetExachangeAndQueueNames(address);
(string? dlqName, string? dlqRoutingKey) = GetDlqQueueNames(queue);

channel.ExchangeDeclare(exchange, "fanout", true, false);
channel.ExchangeDeclare(exchange, "topic", true, false);

if(!isSender)
if (!isSender)
{
_ = channel.QueueDeclare(queue, true, false, false);
var queueArguments = new Dictionary<string, object>();

if (dlqName != null && dlqRoutingKey != null)
{
// create DLQ queues and configure routing
queueArguments["x-dead-letter-exchange"] = exchange;
queueArguments["x-dead-letter-routing-key"] = dlqRoutingKey;

_ = channel.QueueDeclare(dlqName, true, false, false);
channel.QueueBind(dlqName, exchange, dlqRoutingKey);
}

_ = channel.QueueDeclare(queue, true, false, false, queueArguments);
channel.QueueBind(queue, exchange, routingKey);
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/ServiceBusEmulator.IntegrationTests/Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace ServiceBusEmulator.IntegrationTests
{
public class Base
{
private const string ConnectionString = "Endpoint=sb://emulator/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false";
private const string ConnectionString = "Endpoint=sb://sbemulator/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false";

public Base()
{
Expand Down
1 change: 1 addition & 0 deletions test/ServiceBusEmulator.IntegrationTests/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public static class Consts
{
public const string TestQueueName = "test-queue";
public const string TestQueueDlqName = "test-queue/$deadletterqueue";
public const string TestTopicName = "test-topic";
public const string TestSubsciption1Name = "test-topic/Subscriptions/test-sub1";
public const string TestSubsciption2Name = "test-topic/Subscriptions/test-sub2";
Expand Down
31 changes: 30 additions & 1 deletion test/ServiceBusEmulator.IntegrationTests/QueueTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using AutoFixture;
using Azure.Messaging.ServiceBus;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;

namespace ServiceBusEmulator.IntegrationTests
{
Expand Down Expand Up @@ -63,5 +62,35 @@ public async Task ThatMessageIsConfirmed()
);
}
}

[Fact]
public async Task ThatMessageIsDead()
{
string messageBody = Fixture.Create<string>();

var sender = Client.CreateSender(Consts.TestQueueName);
await sender.SendMessageAsync(new ServiceBusMessage(messageBody));

var receiver = Client.CreateReceiver(Consts.TestQueueName, new ServiceBusReceiverOptions
{
ReceiveMode = ServiceBusReceiveMode.PeekLock
});

var receiverDead = Client.CreateReceiver(Consts.TestQueueDlqName, new ServiceBusReceiverOptions
{
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete
});

var receivedMessage = await receiver.ReceiveMessageAsync();
await receiver.DeadLetterMessageAsync(receivedMessage);
var receivedPeekMessage = await receiver.PeekMessageAsync();
var receivedDeadMessage = await receiverDead.ReceiveMessageAsync();

Assert.Multiple(
() => Assert.Equal(messageBody, receivedMessage.Body.ToString()),
() => Assert.Equal(messageBody, receivedDeadMessage.Body.ToString()),
() => Assert.Null(receivedPeekMessage)
);
}
}
}
16 changes: 9 additions & 7 deletions test/ServiceBusEmulator.RabbitMq.Tests/RabbitMqUtilitiesTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ namespace ServiceBusEmulator.RabbitMq.Tests
public class RabbitMqUtilitiesTest : Base<RabbitMqUtilities>
{
[Theory]
[InlineData("dummyQueue", "dummyQueue", "", "dummyQueue")]
[InlineData("topicName", "topicName", "", "topicName")]
[InlineData("topicName/Subscriptions/subName", "topicName", "", "topicName-sub-subName")]
[InlineData("dummyQueue", "dummyQueue", "dummyQueue", "dummyQueue")]
[InlineData("dummyQueue/$deadletterqueue", "dummyQueue", "dummyQueue-dlq", "dummyQueue-dlq")]
[InlineData("topicName", "topicName", "topicName", "topicName")]
[InlineData("topicName/Subscriptions/subName", "topicName", "topicName", "topicName-sub-subName")]
[InlineData("topicName/Subscriptions/subName/$deadletterqueue", "topicName", "topicName-sub-subName-dlq", "topicName-sub-subName-dlq")]
public void ThatAddressIsTranslated(string address, string expectedExchange, string expectedRoutingKey, string expectedQueue)
{
(string exchange, string queue, string routingKey) = Sut.GetExachangeAndQueue(address);
(string exchange, string queue, string routingKey) = Sut.GetExachangeAndQueueNames(address);

Assert.Multiple(
() => Assert.Equal(expectedQueue, queue),
Expand All @@ -31,9 +33,9 @@ public void ThatRabbitObjectAreCreated(bool isSender)

Sut.EnsureExists(channel, address, isSender);

channel.Received(1).ExchangeDeclare(Arg.Any<string>(), "fanout", true, false);
channel.Received(isSender ? 0 : 1).QueueDeclare(Arg.Any<string>(), true, false, false);
channel.Received(isSender ? 0 : 1).QueueBind(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>());
channel.Received(1).ExchangeDeclare(Arg.Any<string>(), "topic", true, false);
channel.Received(isSender ? 0 : 2).QueueDeclare(Arg.Any<string>(), true, false, false, Arg.Any<IDictionary<string, object>>());
channel.Received(isSender ? 0 : 2).QueueBind(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>());
}
}
}

0 comments on commit e22dcbe

Please sign in to comment.