-
-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/rabbitmq adapters #87
base: main
Are you sure you want to change the base?
Changes from all commits
1176e43
0e51b7c
0b8c676
8889189
613ae62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
using System.Text; | ||
using Common; | ||
using Common.Extensions; | ||
using Infrastructure.Persistence.Interfaces; | ||
using Infrastructure.Persistence.RabbitMq.Extensions; | ||
using JetBrains.Annotations; | ||
using RabbitMQ.Client; | ||
using RabbitMQ.Client.Exceptions; | ||
|
||
namespace Infrastructure.Persistence.RabbitMq.ApplicationServices; | ||
|
||
[UsedImplicitly] | ||
public class RabbitMqQueueStore : IQueueStore, IAsyncDisposable | ||
{ | ||
private readonly RabbitMqStoreOptions _options; | ||
private readonly Dictionary<string, bool> _queueExistenceChecks = new(); | ||
private readonly IRecorder _recorder; | ||
|
||
private IConnection? _connection; | ||
|
||
public static RabbitMqQueueStore Create(IRecorder recorder, RabbitMqStoreOptions options) | ||
{ | ||
return new RabbitMqQueueStore(recorder, options); | ||
} | ||
|
||
private RabbitMqQueueStore(IRecorder recorder, RabbitMqStoreOptions options) | ||
{ | ||
_recorder = recorder; | ||
_options = options; | ||
} | ||
|
||
public ValueTask DisposeAsync() | ||
{ | ||
_connection?.Dispose(); | ||
return ValueTask.CompletedTask; | ||
} | ||
|
||
#if TESTINGONLY | ||
public async Task<Result<long, Error>> CountAsync(string queueName, CancellationToken cancellationToken) | ||
{ | ||
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName); | ||
|
||
var channel = await ConnectToQueueAsync(queueName, cancellationToken); | ||
|
||
try | ||
{ | ||
var declareOk = channel.QueueDeclarePassive(queueName); | ||
return declareOk.MessageCount; | ||
} | ||
catch (OperationInterruptedException ex) | ||
{ | ||
_recorder.TraceError(null, ex, | ||
"Failed to retrieve message count for queue: {Queue}. Reason: {ErrorMessage}", | ||
queueName, ex.Message); | ||
return 0; | ||
} | ||
finally | ||
{ | ||
channel.Dispose(); | ||
} | ||
} | ||
#endif | ||
|
||
#if TESTINGONLY | ||
public async Task<Result<Error>> DestroyAllAsync(string queueName, CancellationToken cancellationToken) | ||
{ | ||
#if TESTINGONLY | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could remove this since the whole function is TESTINGONLY |
||
queueName.ThrowIfNotValuedParameter(nameof(queueName), | ||
Resources.AnyStore_MissingQueueName); | ||
|
||
var channel = await ConnectToQueueAsync(queueName, cancellationToken); | ||
try | ||
{ | ||
channel.QueueDelete(queue: queueName, ifUnused: false, ifEmpty: false); | ||
_queueExistenceChecks.Remove(queueName); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_recorder.TraceError(null, ex, "Failed to delete queue: {Queue}", queueName); | ||
return ex.ToError(ErrorCode.Unexpected); | ||
} | ||
finally | ||
{ | ||
channel.Dispose(); | ||
} | ||
#else | ||
await Task.CompletedTask; | ||
#endif | ||
return Result.Ok; | ||
} | ||
#endif | ||
|
||
public async Task<Result<bool, Error>> PopSingleAsync( | ||
string queueName, | ||
Func<string, CancellationToken, Task<Result<Error>>> messageHandlerAsync, | ||
CancellationToken cancellationToken) | ||
{ | ||
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName); | ||
ArgumentNullException.ThrowIfNull(messageHandlerAsync); | ||
|
||
var channel = await ConnectToQueueAsync(queueName, cancellationToken); | ||
|
||
BasicGetResult? messageResult = null; | ||
try | ||
{ | ||
messageResult = channel.BasicGet(queueName, autoAck: false); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_recorder.TraceError(null, ex, | ||
"Failed to retrieve message from queue: {Queue}", queueName); | ||
channel.Dispose(); | ||
return ex.ToError(ErrorCode.Unexpected); | ||
} | ||
|
||
if (messageResult == null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets use .NotExists() and .Exists() instead of comparing with null like this. |
||
{ | ||
_recorder.TraceInformation(null, "No message on queue: {Queue}", queueName); | ||
channel.Dispose(); | ||
return false; | ||
} | ||
|
||
var body = Encoding.UTF8.GetString(messageResult.Body.ToArray()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Encoding.UTF8 move this to a static readonly field, and replace in this calss, to make sure that all code that encodes/decodes uses the same encoding, both directions. |
||
|
||
try | ||
{ | ||
var handled = await messageHandlerAsync(body, cancellationToken); | ||
if (handled.IsFailure) | ||
{ | ||
channel.BasicNack(messageResult.DeliveryTag, multiple: false, requeue: true); | ||
channel.Dispose(); | ||
return handled.Error; | ||
} | ||
} | ||
catch (Exception ex) | ||
{ | ||
channel.BasicNack(messageResult.DeliveryTag, multiple: false, requeue: true); | ||
|
||
_recorder.TraceError(null, ex, | ||
"Failed to handle last message: {MessageId} from queue: {Queue}", | ||
messageResult.DeliveryTag, queueName); | ||
channel.Dispose(); | ||
return ex.ToError(ErrorCode.Unexpected); | ||
} | ||
|
||
try | ||
{ | ||
channel.BasicAck(messageResult.DeliveryTag, multiple: false); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_recorder.TraceError(null, ex, | ||
"Failed to acknowledge message: {DeliveryTag} from queue: {Queue}", | ||
messageResult.DeliveryTag, queueName); | ||
} | ||
|
||
channel.Dispose(); | ||
return true; | ||
} | ||
|
||
public async Task<Result<Error>> PushAsync(string queueName, string message, CancellationToken cancellationToken) | ||
{ | ||
queueName.ThrowIfNotValuedParameter(nameof(queueName), Resources.AnyStore_MissingQueueName); | ||
message.ThrowIfNotValuedParameter(nameof(message), Resources.AnyStore_MissingSentMessage); | ||
|
||
var channel = await ConnectToQueueAsync(queueName, cancellationToken); | ||
|
||
try | ||
{ | ||
var body = Encoding.UTF8.GetBytes(message); | ||
channel.BasicPublish(exchange: "", | ||
routingKey: queueName, | ||
basicProperties: null, | ||
body: body); | ||
|
||
_recorder.TraceInformation(null, "Added message to queue: {Queue}", queueName); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_recorder.TraceError(null, ex, | ||
"Failed to push message: {Message} to queue: {Queue}", | ||
message, queueName); | ||
channel.Dispose(); | ||
return ex.ToError(ErrorCode.Unexpected); | ||
} | ||
|
||
channel.Dispose(); | ||
return Result.Ok; | ||
} | ||
|
||
private async Task<IModel> ConnectToQueueAsync(string queueName, CancellationToken cancellationToken) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add |
||
{ | ||
var sanitizedQueueName = queueName.SanitizeAndValidateStorageAccountResourceName(); | ||
|
||
EnsureConnection(); | ||
|
||
var channel = _connection!.CreateModel(); | ||
|
||
if (IsQueueExistenceCheckPerformed(sanitizedQueueName)) | ||
{ | ||
return channel; | ||
} | ||
|
||
try | ||
{ | ||
channel.QueueDeclare( | ||
queue: sanitizedQueueName, | ||
durable: true, | ||
exclusive: false, | ||
autoDelete: false, | ||
arguments: null); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_recorder.TraceError(null, ex, "Failed to declare queue: {Queue}", sanitizedQueueName); | ||
throw; | ||
} | ||
|
||
return channel; | ||
} | ||
|
||
private bool IsQueueExistenceCheckPerformed(string queueName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Im not certain how Rabbit queues work, but this pattern of checking for existence is necessary when you want to save a HTTP call to check existence. (as is the case in Azure Storage queues), you may find that with RabbitMq it is redundant of queues can be accessed when they do not exist. |
||
{ | ||
_queueExistenceChecks.TryAdd(queueName, false); | ||
if (_queueExistenceChecks[queueName]) | ||
{ | ||
return true; | ||
} | ||
|
||
_queueExistenceChecks[queueName] = true; | ||
return false; | ||
} | ||
|
||
private void EnsureConnection() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you double-check the lifetime of the connection you are caching here. |
||
{ | ||
if (_connection == null || !_connection.IsOpen) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use .NotExists() |
||
{ | ||
var factory = new ConnectionFactory | ||
{ | ||
HostName = _options.HostName, | ||
Port = _options.Port ?? 5672, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this a constant at top of the class |
||
UserName = _options.UserName, | ||
Password = _options.Password, | ||
VirtualHost = _options.VirtualHost | ||
}; | ||
_connection = factory.CreateConnection(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
using Common.Configuration; | ||
|
||
namespace Infrastructure.Persistence.RabbitMq.ApplicationServices | ||
{ | ||
public class RabbitMqStoreOptions | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would make sense to have some simple unit tests for the intended methods of this class like the From* methods, to make sure the connectionstring provduced are correct. See |
||
{ | ||
internal const string HostNameSettingName = "ApplicationServices:Persistence:RabbitMQ:HostName"; | ||
internal const string UserNameSettingName = "ApplicationServices:Persistence:RabbitMQ:UserName"; | ||
internal const string PasswordSettingName = "ApplicationServices:Persistence:RabbitMQ:Password"; | ||
internal const string VirtualHostSettingName = "ApplicationServices:Persistence:RabbitMQ:VirtualHost"; | ||
|
||
public RabbitMqStoreOptions( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be private, to stop consumers using it directly, and force them to use the From* methods below? |
||
string hostName, | ||
string? userName = null, | ||
string? password = null, | ||
string? virtualHost = null, | ||
int? port = null, | ||
bool useAsyncDispatcher = true) | ||
{ | ||
HostName = hostName; | ||
UserName = userName; | ||
Password = password; | ||
VirtualHost = virtualHost; | ||
Port = port; | ||
UseAsyncDispatcher = useAsyncDispatcher; | ||
} | ||
|
||
public string HostName { get; } | ||
public string? UserName { get; } | ||
public string? Password { get; } | ||
public string? VirtualHost { get; } | ||
public int? Port { get; } | ||
public bool UseAsyncDispatcher { get; } | ||
|
||
public static RabbitMqStoreOptions FromCredentials(IConfigurationSettings settings) | ||
{ | ||
return new RabbitMqStoreOptions( | ||
hostName: settings.GetString(HostNameSettingName), | ||
userName: settings.GetString(UserNameSettingName), | ||
password: settings.GetString(PasswordSettingName), | ||
virtualHost: settings.GetString(VirtualHostSettingName)); | ||
} | ||
|
||
public static RabbitMqStoreOptions FromConnectionString(string connectionString) | ||
{ | ||
var uri = new Uri(connectionString); | ||
|
||
var userInfo = uri.UserInfo.Split(':'); | ||
var userName = userInfo[0]; | ||
var password = userInfo.Length > 1 ? userInfo[1] : string.Empty; | ||
|
||
var hostName = uri.Host; | ||
var port = uri.Port; | ||
|
||
var virtualHost = uri.AbsolutePath == "/" ? "/" : uri.AbsolutePath.TrimStart('/'); | ||
|
||
return new RabbitMqStoreOptions( | ||
hostName: hostName, | ||
userName: userName, | ||
password: password, | ||
port: port, | ||
virtualHost: virtualHost | ||
); | ||
} | ||
|
||
public string ToConnectionString() | ||
{ | ||
var portPart = Port.HasValue ? $":{Port}" : string.Empty; | ||
var virtualHostPart = string.IsNullOrEmpty(VirtualHost) ? string.Empty : $"/{VirtualHost}"; | ||
return $"amqp://{UserName}:{Password}@{HostName}{portPart}{virtualHostPart}"; | ||
} | ||
|
||
public static string BuildConnectionString( | ||
string hostName, | ||
string userName, | ||
string password, | ||
string virtualHost, | ||
int port, | ||
bool useAsyncDispatcher = true) | ||
{ | ||
var options = new RabbitMqStoreOptions( | ||
hostName, | ||
userName, | ||
password, | ||
virtualHost, | ||
port, | ||
useAsyncDispatcher); | ||
return options.ToConnectionString(); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is all this dead code? can we remove it all? |
||
public static string BuildConnectionString(RabbitMqStoreOptions options) | ||
{ | ||
return options.ToConnectionString(); | ||
} | ||
|
||
public static string BuildConnectionString(IConfigurationSettings configuration) | ||
{ | ||
var options = FromCredentials(configuration); | ||
return options.ToConnectionString(); | ||
} | ||
|
||
public static string BuildConnectionString(string connectionString) | ||
{ | ||
var options = FromConnectionString(connectionString); | ||
return options.ToConnectionString(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace, has no meaning