Skip to content

Commit

Permalink
Add queue whitelisting
Browse files Browse the repository at this point in the history
  • Loading branch information
onlyann committed Sep 16, 2017
1 parent c6003b4 commit e1fbb4f
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 36 deletions.
41 changes: 30 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ This is inspired from the [Rabbit MQ implementation](http://docs.servicestack.ne
- OneWay MQ and HTTP Service Clients are Substitutable


>> ServiceStack has added MQ support for Azure Service Bus as part of their v4.5.14 release maintained >> at https://github.com/ServiceStack/ServiceStack.Azure.
>>
>> I would recommend using the official implementation instead of this one if it covers your needs.
>>
>> One reason you may want to give this non-official implementation a try is that you are not targeting
>> .NET Core and you need some feature that is not part of the official MQ Server.
>>
>> Ideally, the official package eventually offers all features (and likely more) and this repository can enjoy
>> an early retirement.
> ServiceStack has added MQ support for Azure Service Bus as part of their v4.5.14 release maintained at https://github.com/ServiceStack/ServiceStack.Azure.
>
> I would recommend using the official implementation instead of this one if it covers your needs.
>
> One reason you may want to give this non-official implementation a try is that you are not targeting
> .NET Core and you need some feature that is not part of the official MQ Server.
>
> Ideally, the official package eventually offers all features (and likely more) and this repository can enjoy
> an early retirement.
## Adding Azure Service Bus MQ support to ServiceStack

Expand Down Expand Up @@ -64,6 +64,10 @@ The [AzureBusServer](src\ServiceStack.AzureServiceBus\AzureBusServer.cs) has the
- `Action<QueueDescription>` **CreateQueueFilter** - A filter to customize the options Azure Queues are created/updated with.
- `Action<string, BrokeredMessage>` **GetMessageFilter** - Called every time a message is received.
- `Action<string, BrokeredMessage, IMessage>` **PublishMessageFilter** - Called every time a message gets published.
- `string[]` **PriorityQueuesWhitelist** - If you only want to enable priority queue handlers (and threads) for specific message types. All message types have priority queues by default.
- `bool` **DisablePriorityQueues** - No priority queue will be created or listened to.
- `string[]` **PublishResponsesWhitelist** - Opt-in to only publish responses on this whitelist. All responses are published by default.
- `bool` **DisablePublishingResponses** - No response will be published.

As an alternative to a connection string, you can pass an instance of `AzureBusMessageFactory` to the `AzureBusServer` constructor and provide your own [NamespaceManager](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.namespacemanager?redirectedfrom=MSDN&view=azureservicebus-4.1.1#microsoft_servicebus_namespacemanager) and [MessagingFactory](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.messagingfactory?view=azureservicebus-4.1.1).

Expand Down Expand Up @@ -118,9 +122,24 @@ using (var mqClient = mqServer.CreateMessageQueueClient())
}
```

Note that `brokeredMsg` parameter of `GetMessageFilter` when explicitly retrieving a message results in a timeout.
Note that the `brokeredMsg` parameter of `GetMessageFilter` can be null when explicitly retrieving a message results in a timeout.

## Whitelisting priority messages and publishing responses

By default, all registered handlers will result in listening to a normal priority queue and a high priority queue. As well, all message responses get published to their respective queues.

Priority messages and publishing responses can be entirely disabled by setting `DisablePriorityQueues` and `DisablePublishingResponses` respectively to true.

It is also possible to whitelist the priority queues and responses to publish by message type.

```
// only use a priority queue for Hello messages
mqServer.PriorityQueuesWhitelist = new[] { nameof(Hello) };
// only publish HelloResponse responses
mqServer.PublishResponsesWhitelist = new[] { nameof(HelloResponse) };
```

## Upcoming Features

- [ ] queue whitelisting
- [ ] error handler
13 changes: 9 additions & 4 deletions src/ServiceStack.AzureServiceBus/AzureBusExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,18 @@ public static Task RegisterQueuesAsync(
QueueNames queueNames,
Action<QueueDescription> createQueueFilter = null)
{
return Task.WhenAll(
namespaceMgr.RegisterQueueAsync(queueNames.In, createQueueFilter),
namespaceMgr.RegisterQueueAsync(queueNames.Priority, createQueueFilter),
namespaceMgr.RegisterQueueAsync(queueNames.Out, createQueueFilter));
return RegisterQueuesAsync(namespaceMgr, new[] { queueNames.In, queueNames.Out, queueNames.Priority }, createQueueFilter);
// queueNames.Dlq is created by Azure Service Bus
}

public static Task RegisterQueuesAsync(
this NamespaceManager namespaceMgr,
IEnumerable<string> queueNames,
Action<QueueDescription> createQueueFilter = null)
{
return Task.WhenAll(queueNames.Select(x => namespaceMgr.RegisterQueueAsync(x, createQueueFilter)));
}

public static Task RegisterQueuesAsync<T>(
this NamespaceManager namespaceMgr,
Action<QueueDescription> createQueueFilter = null)
Expand Down
44 changes: 42 additions & 2 deletions src/ServiceStack.AzureServiceBus/AzureBusServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using ServiceStack.Logging;
using ServiceStack.Messaging;
using ServiceStack.Text;
using ServiceStack;
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -84,6 +85,36 @@ public Action<string, BrokeredMessage, IMessage> PublishMessageFilter
/// </summary>
public Func<object, object> ResponseFilter { get; set; }

/// <summary>
/// If you only want to enable priority queue handlers (and threads) for specific msg types
/// </summary>
public string[] PriorityQueuesWhitelist { get; set; }

/// <summary>
/// Don't listen on any Priority Queues
/// </summary>
public bool DisablePriorityQueues
{
set
{
PriorityQueuesWhitelist = TypeConstants.EmptyStringArray;
}
}

/// <summary>
/// Opt-in to only publish responses on this white list.
/// Publishes all responses by default.
/// </summary>
public string[] PublishResponsesWhitelist { get; set; }

/// <summary>
/// Don't publish any response messages
/// </summary>
public bool DisablePublishingResponses
{
set { PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null; }
}

private int status;

public AzureBusServer(string connectionString): this(new AzureBusMessageFactory(connectionString))
Expand Down Expand Up @@ -157,6 +188,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
RequestFilter = RequestFilter,
ResponseFilter = ResponseFilter,
RetryCount = RetryCount,
PublishResponsesWhitelist = PublishResponsesWhitelist
};
}

Expand Down Expand Up @@ -204,19 +236,27 @@ public virtual async Task Init()
var queueNames = new QueueNames(msgType);
var noOfThreads = handlerThreadCountMap[msgType];

msgPumpsBuilder.Add(new AzureMessageReceiverPump(
var queuesToRegister = new List<string> { queueNames.In, queueNames.Out };

if (PriorityQueuesWhitelist == null
|| PriorityQueuesWhitelist.Any(x => x == msgType.Name))
{
msgPumpsBuilder.Add(new AzureMessageReceiverPump(
messageFactory,
handlerFactory,
queueNames.Priority,
noOfThreads));

queuesToRegister.Add(queueNames.Priority);
}

msgPumpsBuilder.Add(new AzureMessageReceiverPump(
messageFactory,
handlerFactory,
queueNames.In,
noOfThreads));

await messageFactory.NamespaceManager.RegisterQueuesAsync(queueNames, createQueueFilter).ConfigureAwait(false);
await messageFactory.NamespaceManager.RegisterQueuesAsync(queuesToRegister, createQueueFilter).ConfigureAwait(false);
}

messagePumps = msgPumpsBuilder.ToArray();
Expand Down
183 changes: 164 additions & 19 deletions tests/ServiceStack.AzureServiceBus.Tests/AzureBusServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ public class AlwaysThrows
public string Value { get; set; }
}

public class Hello : IReturn<HelloResponse>
{
public string Name { get; set; }
}
public class HelloNull1 : IReturn<HelloResponse>
{
public string Name { get; set; }
}

public class HelloNull2 : IReturn<HelloResponse>
{
public string Name { get; set; }
}

public class HelloResponse
{
public string Result { get; set; }
}

[TestFixture, Category("Integration")]
[NonParallelizable]
public class AzureBusServerTests
Expand Down Expand Up @@ -151,25 +170,6 @@ public async Task Can_receive_and_process_same_reply_responses()
}
}

public class Hello : IReturn<HelloResponse>
{
public string Name { get; set; }
}
public class HelloNull1 : IReturn<HelloResponse>
{
public string Name { get; set; }
}

public class HelloNull2 : IReturn<HelloResponse>
{
public string Name { get; set; }
}

public class HelloResponse
{
public string Result { get; set; }
}

[Test]
public async Task Can_receive_and_process_standard_request_reply_combo()
{
Expand Down Expand Up @@ -369,6 +369,7 @@ await mqServer.MessageFactory.PurgeQueuesAsync(
});

var msg = mqClient.Get<HelloNull2>(replyMq, TimeSpan.FromSeconds(10));
mqClient.Ack(msg);

await Task.Delay(100);

Expand Down Expand Up @@ -439,5 +440,149 @@ await nsMgr.RegisterQueuesAsync<Hello>(desc =>
}
}
}

[Test]
public async Task Can_disable_priority_queues()
{
var msgsReceived = 0;

using (var mqServer = CreateMqServer())
{
await mqServer.MessageFactory.PurgeQueuesAsync(
QueueNames<Hello>.In,
QueueNames<HelloResponse>.In,
QueueNames<Hello>.Priority);

mqServer.DisablePriorityQueues = true;

mqServer.RegisterHandler<Hello>(message =>
{
Interlocked.Increment(ref msgsReceived);
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
});

mqServer.Start();

using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello" }));
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello in priority" }) { Priority = 1 });

var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
mqClient.Ack(msg);
Assert.That(msg.GetBody().Result, Is.EqualTo("Hello world"));
}

Assert.That(msgsReceived, Is.EqualTo(1));
}
}

[Test]
public async Task Can_whitelist_priority_queue_by_message_type()
{
var msgsReceived = 0;

using (var mqServer = CreateMqServer())
{
await mqServer.MessageFactory.PurgeQueuesAsync(
QueueNames<Hello>.In,
QueueNames<HelloResponse>.In,
QueueNames<Hello>.Priority);

mqServer.PriorityQueuesWhitelist = new[] { nameof(Hello) };

mqServer.RegisterHandler<Hello>(message =>
{
Interlocked.Increment(ref msgsReceived);
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
});

mqServer.Start();

using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello" }));
mqClient.Publish(new Message<Hello>(new Hello { Name = "Hello in priority" }) { Priority = 1 });

var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
mqClient.Ack(msg);
msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
mqClient.Ack(msg);
}

Assert.That(msgsReceived, Is.EqualTo(2));
}
}

[Test]
public async Task Can_disable_publishing_responses()
{
var msgsReceived = 0;

using (var mqServer = CreateMqServer())
{
await mqServer.MessageFactory.PurgeQueuesAsync(
QueueNames<Hello>.In,
QueueNames<HelloResponse>.In,
QueueNames<Hello>.Priority);

mqServer.DisablePublishingResponses = true;

mqServer.RegisterHandler<Hello>(message =>
{
Interlocked.Increment(ref msgsReceived);
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
});

mqServer.Start();

using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "Hello" });

var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, TimeSpan.FromSeconds(1));
Assert.Null(msg);
}

Assert.That(msgsReceived, Is.EqualTo(1));
}
}

[Test]
public async Task Can_whitelist_publishing_responses_by_message_type()
{
var msgsReceived = 0;

using (var mqServer = CreateMqServer())
{
await mqServer.MessageFactory.PurgeQueuesAsync(
QueueNames<Hello>.In,
QueueNames<HelloResponse>.In,
QueueNames<Hello>.Priority);

mqServer.PublishResponsesWhitelist = new[] { nameof(HelloResponse) };

mqServer.RegisterHandler<Hello>(message =>
{
Interlocked.Increment(ref msgsReceived);
return new HelloResponse() { Result = $"{message.GetBody().Name} world" };
});

mqServer.Start();

using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "Hello" });

var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In, Config.ServerWaitTime);
if (msg != null)
{
mqClient.Ack(msg);
}
}

Assert.That(msgsReceived, Is.EqualTo(1));
}
}
}
}

0 comments on commit e1fbb4f

Please sign in to comment.