diff --git a/README.md b/README.md index 38d0057..9255915 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,11 @@ Emulator is really an AMQP 1.0 proxy that translates Azure Service Bus specifics into backend implementation specifics. Backend is an actual queue implementation that runs locally, not in cloud, and handles the messages. The main use case is to help with local development experience and with integration testing where application code cannot change driver easily. -See instructions for running from [Docker](docs/install-docker.md), running from [Package](docs/install-package.md), and [here](docs/examples.md) examples of client implementations. +See instructions for +* [Running from Docker](docs/install-docker.md), +* [Running from Package](docs/install-package.md), +* [Configuration](docs/configuration.md), +* [Examples of client implementations](docs/examples.md). ![](docs/example-azure-function.gif) @@ -34,11 +38,10 @@ Current implementation is considered experimental, but from initial tests it wor * Sending, Receiving, Peeking messages, including metadata * Message Annotations: TTL, ID, Header, ApplicationData, and more * Queues, Topics and Subscriptions +* Dead Letter Queues * Renew Locks (dummy) ## Not Supported Features - -* Dead Letter (not implemented yet) * Message Annotations: partitions, groups, reply-to and more (ignored) * Sessions, Transactions * Scheduled and Delayed messages diff --git a/docker-compose.integration.yml b/docker-compose.integration.yml index 5c1212c..52390c1 100644 --- a/docker-compose.integration.yml +++ b/docker-compose.integration.yml @@ -1,6 +1,6 @@ version: "3.9" services: - emulator: + sbemulator: environment: - Emulator__QueuesAndTopics=test-queue;test-topic/Subscriptions/test-sub1;test-topic/Subscriptions/test-sub2 @@ -10,4 +10,4 @@ services: context: . dockerfile: ./test/ServiceBusEmulator.IntegrationTests/Dockerfile depends_on: - - emulator \ No newline at end of file + - sbemulator \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 8a61a57..40127c7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: timeout: 30s retries: 60 - emulator: + sbemulator: image: devopsifyme/sbemu:latest build: context: . diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..49d211b --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,28 @@ +# Configuration + +Environmental Variable | Docker Default | Description +-|-|- +Emulator__QueuesAndTopics | | Coma-separated list of queues and topics to be created on startup. See below +Emulator__Port | 5671 | Port on which service is listening +Emulator__ServerCertificatePath | /app/cert.pfx | Path to a PFX file containing emulator's TLS certificate +Emulator__ServerCertificatePassword | password | Password for the PFX file +Emulator__ServerCertificateThumbprint | | Server certificate in current user's certificate store +Emulator__RabbitMq__Username | guest | RabbitMQ backend - username +Emulator__RabbitMq__Password | guest | RabbitMQ backend - password +Emulator__RabbitMq__Host | localhost | RabbitMQ backend - hostname +Emulator__RabbitMq__Port | 5672 | RabbitMQ backend - port + +## Emulator.QueuesAndTopics + +You can use this to preinitialize queues and topics, if your tests require this. Otherwise some messages sent might be lost, if the receiver connects after they are sent. For example, integration tests in this repo rely on this setting (see docker-compose.integration.yml). + +Sample value +``` +queue-name;topic-name/Subscriptions/sub1;topic-name/Subscriptions/sub2 +``` + +## Bring your own certificate + +If you don't want to trust provided test CA, you can provide your own server certificate. Mount your PFX file in any directory, and update environmental variables `Emulator__ServerCertificatePath` and `Emulator__ServerCertificatePassword`. + +> Note that the provided server certificate is regenerated each release, so it is not convenient to trust it. Therefore by default it is required to trust the Test CA from this repository. Since this certificate is publicly available here, it could potentially allow attacker to generate any certificate that you would trust. \ No newline at end of file diff --git a/docs/install-docker.md b/docs/install-docker.md index ef6fb7d..22bc60b 100644 --- a/docs/install-docker.md +++ b/docs/install-docker.md @@ -6,9 +6,10 @@ Note that the SharedAccess values must be exactly as specified as this is curren > "Endpoint=sb://localhost/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false" -Emulator also supports following hostnames: +Emulator also supports following hostnames with the provided certificate: * sb://localhost (for local dev) * sb://sbemulator (for usage with docker compose, etc.) +* sb://emulator (for usage with docker compose, etc.) * sb://devopsifyme-local.servicebus.windows.net (for strict SDK requiring specific host, override in /etc/host) ## Run from DockerHub @@ -40,3 +41,18 @@ docker compose up --detach Import-Certificate -FilePath "docker\rabbitmq-amqp1\testca\cacert.cer" -CertStoreLocation cert:\CurrentUser\Root ``` +## Run automated integration tests + +We use docker compose for running integration tests that are written in XUnit. At the end technology choice is not that important, as the approach. + +1) docker-compose.yml to define RabbitMQ backend and Emulator services - used during development +2) docker-compose.integration.yml + * Additional emulator configuration specific to the test + * Build docker image for the test runner of your choosing +3) Run `docker compose up` targetting `integration` service, which runs our tests + * It also starts Rabbit and Emulator because of the dependencies + * If `integration` container exits with code <> 0 => our tests have failed + +``` + docker compose -f .\docker-compose.yml -f .\docker-compose.integration.yml up --build integration +``` \ No newline at end of file diff --git a/src/ServiceBusEmulator.Host/appsettings.Docker.json b/src/ServiceBusEmulator.Host/appsettings.Docker.json index 2c8b62b..4c2cd06 100644 --- a/src/ServiceBusEmulator.Host/appsettings.Docker.json +++ b/src/ServiceBusEmulator.Host/appsettings.Docker.json @@ -4,8 +4,8 @@ "ServerCertificatePassword": "password", "Port": 5671, "RabbitMq": { - "Username": "user", - "Password": "password", + "Username": "guest", + "Password": "guest", "Host": "localhost", "Port": 5672 } diff --git a/src/ServiceBusEmulator.RabbitMq/Commands/PeekMessageCommand.cs b/src/ServiceBusEmulator.RabbitMq/Commands/PeekMessageCommand.cs index 1f2b70b..909fccd 100644 --- a/src/ServiceBusEmulator.RabbitMq/Commands/PeekMessageCommand.cs +++ b/src/ServiceBusEmulator.RabbitMq/Commands/PeekMessageCommand.cs @@ -19,7 +19,7 @@ public PeekMessageCommand(IRabbitMqUtilities utilities, IRabbitMqMapper mapper) public (Message, AmqpResponseStatusCode) Handle(Message request, IModel channel, string address) { - (_, string? queueName, _) = _utilities.GetExachangeAndQueue(address); + (_, string? queueName, _) = _utilities.GetExachangeAndQueueNames(address); Map requestBody = (Map)request.Body; int fromSequence = Convert.ToInt32(requestBody[ManagementConstants.Properties.FromSequenceNumber]); diff --git a/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqReceiverEndpoint.cs b/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqReceiverEndpoint.cs index b7c4126..36bde31 100644 --- a/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqReceiverEndpoint.cs +++ b/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqReceiverEndpoint.cs @@ -29,7 +29,7 @@ public override void SetContext(IModel channel, Source source, ReceiverSettleMod { Channel = channel; ReceiveSettleMode = rcvSettleMode; - (_, QueueName, _) = _utilities.GetExachangeAndQueue(source.Address); + (_, QueueName, _) = _utilities.GetExachangeAndQueueNames(source.Address); } public override void OnLinkClosed(ListenerLink link, Error error) diff --git a/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqSenderEndpoint.cs b/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqSenderEndpoint.cs index aee0664..5523d78 100644 --- a/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqSenderEndpoint.cs +++ b/src/ServiceBusEmulator.RabbitMq/Endpoints/RabbitMqSenderEndpoint.cs @@ -30,7 +30,7 @@ public override void SetContext(IModel channel, Target target) { Channel = channel; Target = target; - (ExchangeName, QueueName, RoutingKey) = _utilities.GetExachangeAndQueue(Target.Address); + (ExchangeName, QueueName, RoutingKey) = _utilities.GetExachangeAndQueueNames(Target.Address); } public override void OnLinkClosed(ListenerLink link, Error error) diff --git a/src/ServiceBusEmulator.RabbitMq/IRabbitMqUtilities.cs b/src/ServiceBusEmulator.RabbitMq/IRabbitMqUtilities.cs index 8217e43..c1ba801 100644 --- a/src/ServiceBusEmulator.RabbitMq/IRabbitMqUtilities.cs +++ b/src/ServiceBusEmulator.RabbitMq/IRabbitMqUtilities.cs @@ -5,6 +5,6 @@ namespace ServiceBusEmulator.RabbitMq public interface IRabbitMqUtilities { void EnsureExists(IModel channel, string address, bool isSender = false); - (string exchange, string queue, string routingKey) GetExachangeAndQueue(string address); + (string exchange, string queue, string routingKey) GetExachangeAndQueueNames(string address); } } diff --git a/src/ServiceBusEmulator.RabbitMq/RabbitMqUtilities.cs b/src/ServiceBusEmulator.RabbitMq/RabbitMqUtilities.cs index 5319e6e..21d8d0b 100644 --- a/src/ServiceBusEmulator.RabbitMq/RabbitMqUtilities.cs +++ b/src/ServiceBusEmulator.RabbitMq/RabbitMqUtilities.cs @@ -1,11 +1,10 @@ -using Microsoft.Extensions.Options; -using RabbitMQ.Client; +using RabbitMQ.Client; namespace ServiceBusEmulator.RabbitMq { public class RabbitMqUtilities : IRabbitMqUtilities { - public (string exchange, string queue, string routingKey) GetExachangeAndQueue(string address) + public (string exchange, string queue, string routingKey) GetExachangeAndQueueNames(string address) { if (address.StartsWith("/")) { @@ -14,22 +13,51 @@ public class RabbitMqUtilities : IRabbitMqUtilities string[] parts = address.Split('/'); string exchangeName = parts[0]; - string routingKey = string.Empty; - + string routingKey = parts[0]; string queueName = address.Contains("/Subscriptions/") ? $"{parts[0]}-sub-{parts[2]}" : parts[0]; + if (address.EndsWith("$deadletterqueue")) + { + (string? dlqQueue, string? dlqRoutingKey) = GetDlqQueueNames(queueName); + queueName = dlqQueue ?? queueName; + routingKey = dlqRoutingKey ?? routingKey; + } + return (exchangeName, queueName, routingKey); } + protected (string? dlqQueue, string? dlqRoutingKey) GetDlqQueueNames(string queue) + { + if(queue.EndsWith("dlq")) + { + return (null, null); + } + + return ($"{queue}-dlq", $"{queue}-dlq"); + } + public void EnsureExists(IModel channel, string address, bool isSender = false) { - (string? exchange, string? queue, string? routingKey) = GetExachangeAndQueue(address); + (string exchange, string queue, string routingKey) = GetExachangeAndQueueNames(address); + (string? dlqName, string? dlqRoutingKey) = GetDlqQueueNames(queue); - channel.ExchangeDeclare(exchange, "fanout", true, false); + channel.ExchangeDeclare(exchange, "topic", true, false); - if(!isSender) + if (!isSender) { - _ = channel.QueueDeclare(queue, true, false, false); + var queueArguments = new Dictionary(); + + if (dlqName != null && dlqRoutingKey != null) + { + // create DLQ queues and configure routing + queueArguments["x-dead-letter-exchange"] = exchange; + queueArguments["x-dead-letter-routing-key"] = dlqRoutingKey; + + _ = channel.QueueDeclare(dlqName, true, false, false); + channel.QueueBind(dlqName, exchange, dlqRoutingKey); + } + + _ = channel.QueueDeclare(queue, true, false, false, queueArguments); channel.QueueBind(queue, exchange, routingKey); } } diff --git a/test/ServiceBusEmulator.IntegrationTests/Base.cs b/test/ServiceBusEmulator.IntegrationTests/Base.cs index ac0015a..4a23bbc 100644 --- a/test/ServiceBusEmulator.IntegrationTests/Base.cs +++ b/test/ServiceBusEmulator.IntegrationTests/Base.cs @@ -5,7 +5,7 @@ namespace ServiceBusEmulator.IntegrationTests { public class Base { - private const string ConnectionString = "Endpoint=sb://emulator/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false"; + private const string ConnectionString = "Endpoint=sb://sbemulator/;SharedAccessKeyName=all;SharedAccessKey=CLwo3FQ3S39Z4pFOQDefaiUd1dSsli4XOAj3Y9Uh1E=;EnableAmqpLinkRedirect=false"; public Base() { diff --git a/test/ServiceBusEmulator.IntegrationTests/Consts.cs b/test/ServiceBusEmulator.IntegrationTests/Consts.cs index bbe006e..654611e 100644 --- a/test/ServiceBusEmulator.IntegrationTests/Consts.cs +++ b/test/ServiceBusEmulator.IntegrationTests/Consts.cs @@ -3,6 +3,7 @@ public static class Consts { public const string TestQueueName = "test-queue"; + public const string TestQueueDlqName = "test-queue/$deadletterqueue"; public const string TestTopicName = "test-topic"; public const string TestSubsciption1Name = "test-topic/Subscriptions/test-sub1"; public const string TestSubsciption2Name = "test-topic/Subscriptions/test-sub2"; diff --git a/test/ServiceBusEmulator.IntegrationTests/QueueTest.cs b/test/ServiceBusEmulator.IntegrationTests/QueueTest.cs index 6be2f42..29013e9 100644 --- a/test/ServiceBusEmulator.IntegrationTests/QueueTest.cs +++ b/test/ServiceBusEmulator.IntegrationTests/QueueTest.cs @@ -1,6 +1,5 @@ using AutoFixture; using Azure.Messaging.ServiceBus; -using Microsoft.VisualStudio.TestPlatform.ObjectModel; namespace ServiceBusEmulator.IntegrationTests { @@ -63,5 +62,35 @@ public async Task ThatMessageIsConfirmed() ); } } + + [Fact] + public async Task ThatMessageIsDead() + { + string messageBody = Fixture.Create(); + + var sender = Client.CreateSender(Consts.TestQueueName); + await sender.SendMessageAsync(new ServiceBusMessage(messageBody)); + + var receiver = Client.CreateReceiver(Consts.TestQueueName, new ServiceBusReceiverOptions + { + ReceiveMode = ServiceBusReceiveMode.PeekLock + }); + + var receiverDead = Client.CreateReceiver(Consts.TestQueueDlqName, new ServiceBusReceiverOptions + { + ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete + }); + + var receivedMessage = await receiver.ReceiveMessageAsync(); + await receiver.DeadLetterMessageAsync(receivedMessage); + var receivedPeekMessage = await receiver.PeekMessageAsync(); + var receivedDeadMessage = await receiverDead.ReceiveMessageAsync(); + + Assert.Multiple( + () => Assert.Equal(messageBody, receivedMessage.Body.ToString()), + () => Assert.Equal(messageBody, receivedDeadMessage.Body.ToString()), + () => Assert.Null(receivedPeekMessage) + ); + } } } \ No newline at end of file diff --git a/test/ServiceBusEmulator.RabbitMq.Tests/RabbitMqUtilitiesTest.cs b/test/ServiceBusEmulator.RabbitMq.Tests/RabbitMqUtilitiesTest.cs index 0679c51..a57a877 100644 --- a/test/ServiceBusEmulator.RabbitMq.Tests/RabbitMqUtilitiesTest.cs +++ b/test/ServiceBusEmulator.RabbitMq.Tests/RabbitMqUtilitiesTest.cs @@ -7,12 +7,14 @@ namespace ServiceBusEmulator.RabbitMq.Tests public class RabbitMqUtilitiesTest : Base { [Theory] - [InlineData("dummyQueue", "dummyQueue", "", "dummyQueue")] - [InlineData("topicName", "topicName", "", "topicName")] - [InlineData("topicName/Subscriptions/subName", "topicName", "", "topicName-sub-subName")] + [InlineData("dummyQueue", "dummyQueue", "dummyQueue", "dummyQueue")] + [InlineData("dummyQueue/$deadletterqueue", "dummyQueue", "dummyQueue-dlq", "dummyQueue-dlq")] + [InlineData("topicName", "topicName", "topicName", "topicName")] + [InlineData("topicName/Subscriptions/subName", "topicName", "topicName", "topicName-sub-subName")] + [InlineData("topicName/Subscriptions/subName/$deadletterqueue", "topicName", "topicName-sub-subName-dlq", "topicName-sub-subName-dlq")] public void ThatAddressIsTranslated(string address, string expectedExchange, string expectedRoutingKey, string expectedQueue) { - (string exchange, string queue, string routingKey) = Sut.GetExachangeAndQueue(address); + (string exchange, string queue, string routingKey) = Sut.GetExachangeAndQueueNames(address); Assert.Multiple( () => Assert.Equal(expectedQueue, queue), @@ -31,9 +33,9 @@ public void ThatRabbitObjectAreCreated(bool isSender) Sut.EnsureExists(channel, address, isSender); - channel.Received(1).ExchangeDeclare(Arg.Any(), "fanout", true, false); - channel.Received(isSender ? 0 : 1).QueueDeclare(Arg.Any(), true, false, false); - channel.Received(isSender ? 0 : 1).QueueBind(Arg.Any(), Arg.Any(), Arg.Any()); + channel.Received(1).ExchangeDeclare(Arg.Any(), "topic", true, false); + channel.Received(isSender ? 0 : 2).QueueDeclare(Arg.Any(), true, false, false, Arg.Any>()); + channel.Received(isSender ? 0 : 2).QueueBind(Arg.Any(), Arg.Any(), Arg.Any()); } } } \ No newline at end of file