Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/rabbitmq adapters #87

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
# AWS User-specific
.idea/**/aws.xml

# Visual Studio 2015/2017 cache/options directory
.vs/

# Generated files
.idea/**/contentModel.xml

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
using System.Text;
using Common;
using Common.Extensions;
using Infrastructure.Persistence.Interfaces;
using Infrastructure.Persistence.RabbitMq.Extensions;
using JetBrains.Annotations;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace Infrastructure.Persistence.RabbitMq.ApplicationServices;

[UsedImplicitly]
public class RabbitMqQueueStore : IQueueStore, IAsyncDisposable
{
private readonly RabbitMqStoreOptions _options;
private readonly Dictionary<string, bool> _queueExistenceChecks = new();
private readonly IRecorder _recorder;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove whitespace, has no meaning

private IConnection? _connection;

public static RabbitMqQueueStore Create(IRecorder recorder, RabbitMqStoreOptions options)
{
return new RabbitMqQueueStore(recorder, options);
}

private RabbitMqQueueStore(IRecorder recorder, RabbitMqStoreOptions options)
{
_recorder = recorder;
_options = options;
}

public ValueTask DisposeAsync()
{
_connection?.Dispose();
return ValueTask.CompletedTask;
}

#if TESTINGONLY
public async Task<Result<long, Error>> CountAsync(string queueName, CancellationToken cancellationToken)
{
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName);

var channel = await ConnectToQueueAsync(queueName, cancellationToken);

try
{
var declareOk = channel.QueueDeclarePassive(queueName);
return declareOk.MessageCount;
}
catch (OperationInterruptedException ex)
{
_recorder.TraceError(null, ex,
"Failed to retrieve message count for queue: {Queue}. Reason: {ErrorMessage}",
queueName, ex.Message);
return 0;
}
finally
{
channel.Dispose();
}
}
#endif

#if TESTINGONLY
public async Task<Result<Error>> DestroyAllAsync(string queueName, CancellationToken cancellationToken)
{
#if TESTINGONLY
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could remove this since the whole function is TESTINGONLY

queueName.ThrowIfNotValuedParameter(nameof(queueName),
Resources.AnyStore_MissingQueueName);

var channel = await ConnectToQueueAsync(queueName, cancellationToken);
try
{
channel.QueueDelete(queue: queueName, ifUnused: false, ifEmpty: false);
_queueExistenceChecks.Remove(queueName);
}
catch (Exception ex)
{
_recorder.TraceError(null, ex, "Failed to delete queue: {Queue}", queueName);
return ex.ToError(ErrorCode.Unexpected);
}
finally
{
channel.Dispose();
}
#else
await Task.CompletedTask;
#endif
return Result.Ok;
}
#endif

public async Task<Result<bool, Error>> PopSingleAsync(
string queueName,
Func<string, CancellationToken, Task<Result<Error>>> messageHandlerAsync,
CancellationToken cancellationToken)
{
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName);
ArgumentNullException.ThrowIfNull(messageHandlerAsync);

var channel = await ConnectToQueueAsync(queueName, cancellationToken);

BasicGetResult? messageResult = null;
try
{
messageResult = channel.BasicGet(queueName, autoAck: false);
}
catch (Exception ex)
{
_recorder.TraceError(null, ex,
"Failed to retrieve message from queue: {Queue}", queueName);
channel.Dispose();
return ex.ToError(ErrorCode.Unexpected);
}

if (messageResult == null)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use .NotExists() and .Exists() instead of comparing with null like this.

{
_recorder.TraceInformation(null, "No message on queue: {Queue}", queueName);
channel.Dispose();
return false;
}

var body = Encoding.UTF8.GetString(messageResult.Body.ToArray());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encoding.UTF8 move this to a static readonly field, and replace in this calss, to make sure that all code that encodes/decodes uses the same encoding, both directions.


try
{
var handled = await messageHandlerAsync(body, cancellationToken);
if (handled.IsFailure)
{
channel.BasicNack(messageResult.DeliveryTag, multiple: false, requeue: true);
channel.Dispose();
return handled.Error;
}
}
catch (Exception ex)
{
channel.BasicNack(messageResult.DeliveryTag, multiple: false, requeue: true);

_recorder.TraceError(null, ex,
"Failed to handle last message: {MessageId} from queue: {Queue}",
messageResult.DeliveryTag, queueName);
channel.Dispose();
return ex.ToError(ErrorCode.Unexpected);
}

try
{
channel.BasicAck(messageResult.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
_recorder.TraceError(null, ex,
"Failed to acknowledge message: {DeliveryTag} from queue: {Queue}",
messageResult.DeliveryTag, queueName);
}

channel.Dispose();
return true;
}

public async Task<Result<Error>> PushAsync(string queueName, string message, CancellationToken cancellationToken)
{
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName);
message.ThrowIfNotValuedParameter(nameof(message), Resources.AnyStore_MissingSentMessage);

var channel = await ConnectToQueueAsync(queueName, cancellationToken);

try
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);

_recorder.TraceInformation(null, "Added message to queue: {Queue}", queueName);
}
catch (Exception ex)
{
_recorder.TraceError(null, ex,
"Failed to push message: {Message} to queue: {Queue}",
message, queueName);
channel.Dispose();
return ex.ToError(ErrorCode.Unexpected);
}

channel.Dispose();
return Result.Ok;
}

private async Task<IModel> ConnectToQueueAsync(string queueName, CancellationToken cancellationToken)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add await Task.CompletedTask() at the start of this function, or don't declare it as async methoda nad rename it

{
var sanitizedQueueName = queueName.SanitizeAndValidateStorageAccountResourceName();

EnsureConnection();

var channel = _connection!.CreateModel();

if (IsQueueExistenceCheckPerformed(sanitizedQueueName))
{
return channel;
}

try
{
channel.QueueDeclare(
queue: sanitizedQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
catch (Exception ex)
{
_recorder.TraceError(null, ex, "Failed to declare queue: {Queue}", sanitizedQueueName);
throw;
}

return channel;
}

private bool IsQueueExistenceCheckPerformed(string queueName)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not certain how Rabbit queues work, but this pattern of checking for existence is necessary when you want to save a HTTP call to check existence. (as is the case in Azure Storage queues), you may find that with RabbitMq it is redundant of queues can be accessed when they do not exist.

{
_queueExistenceChecks.TryAdd(queueName, false);
if (_queueExistenceChecks[queueName])
{
return true;
}

_queueExistenceChecks[queueName] = true;
return false;
}

private void EnsureConnection()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double-check the lifetime of the connection you are caching here.
This class instance type may be in the DI as a Singleton (and live for days/weeks) or as Scoped (live for the lifetime of a HTTP thread) in either case, we need to be certain that sharing the cached connection is okay.
It is almost always better to create a new connection for the shortest lifetime possible, every method call, as long as that is not expensive (depends on the SDK and 3rd party you are using)

{
if (_connection == null || !_connection.IsOpen)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use .NotExists()

{
var factory = new ConnectionFactory
{
HostName = _options.HostName,
Port = _options.Port ?? 5672,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a constant at top of the class

UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost
};
_connection = factory.CreateConnection();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using Common.Configuration;

namespace Infrastructure.Persistence.RabbitMq.ApplicationServices
{
public class RabbitMqStoreOptions
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make sense to have some simple unit tests for the intended methods of this class like the From* methods, to make sure the connectionstring provduced are correct. See AzureStorageAccountStoreOptionsSpec.cs

{
internal const string HostNameSettingName = "ApplicationServices:Persistence:RabbitMQ:HostName";
internal const string UserNameSettingName = "ApplicationServices:Persistence:RabbitMQ:UserName";
internal const string PasswordSettingName = "ApplicationServices:Persistence:RabbitMQ:Password";
internal const string VirtualHostSettingName = "ApplicationServices:Persistence:RabbitMQ:VirtualHost";

public RabbitMqStoreOptions(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be private, to stop consumers using it directly, and force them to use the From* methods below?

string hostName,
string? userName = null,
string? password = null,
string? virtualHost = null,
int? port = null,
bool useAsyncDispatcher = true)
{
HostName = hostName;
UserName = userName;
Password = password;
VirtualHost = virtualHost;
Port = port;
UseAsyncDispatcher = useAsyncDispatcher;
}

public string HostName { get; }
public string? UserName { get; }
public string? Password { get; }
public string? VirtualHost { get; }
public int? Port { get; }
public bool UseAsyncDispatcher { get; }

public static RabbitMqStoreOptions FromCredentials(IConfigurationSettings settings)
{
return new RabbitMqStoreOptions(
hostName: settings.GetString(HostNameSettingName),
userName: settings.GetString(UserNameSettingName),
password: settings.GetString(PasswordSettingName),
virtualHost: settings.GetString(VirtualHostSettingName));
}

public static RabbitMqStoreOptions FromConnectionString(string connectionString)
{
var uri = new Uri(connectionString);

var userInfo = uri.UserInfo.Split(':');
var userName = userInfo[0];
var password = userInfo.Length > 1 ? userInfo[1] : string.Empty;

var hostName = uri.Host;
var port = uri.Port;

var virtualHost = uri.AbsolutePath == "/" ? "/" : uri.AbsolutePath.TrimStart('/');

return new RabbitMqStoreOptions(
hostName: hostName,
userName: userName,
password: password,
port: port,
virtualHost: virtualHost
);
}

public string ToConnectionString()
{
var portPart = Port.HasValue ? $":{Port}" : string.Empty;
var virtualHostPart = string.IsNullOrEmpty(VirtualHost) ? string.Empty : $"/{VirtualHost}";
return $"amqp://{UserName}:{Password}@{HostName}{portPart}{virtualHostPart}";
}

public static string BuildConnectionString(
string hostName,
string userName,
string password,
string virtualHost,
int port,
bool useAsyncDispatcher = true)
{
var options = new RabbitMqStoreOptions(
hostName,
userName,
password,
virtualHost,
port,
useAsyncDispatcher);
return options.ToConnectionString();
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is all this dead code? can we remove it all?

public static string BuildConnectionString(RabbitMqStoreOptions options)
{
return options.ToConnectionString();
}

public static string BuildConnectionString(IConfigurationSettings configuration)
{
var options = FromCredentials(configuration);
return options.ToConnectionString();
}

public static string BuildConnectionString(string connectionString)
{
var options = FromConnectionString(connectionString);
return options.ToConnectionString();
}
}
}
Loading