Skip to content

Commit

Permalink
Upgrade to support RabbitMQ.Client 7.0. #1645
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Feb 4, 2025
1 parent 86b5f95 commit d3e90d8
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 125 deletions.
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class PostgreSqlOptions : EFOptions
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
[Obsolete("Use .DataSource = NpgsqlDataSource.Create(<connectionString>) for same behavior.")]
public string ConnectionString { get; set; } = default!;
public string? ConnectionString { get; set; }

/// <summary>
/// Gets or sets the Npgsql data source that will be used to store database entities.
Expand Down
4 changes: 3 additions & 1 deletion src/DotNetCore.CAP.RabbitMQ/CAP.Options.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ namespace Microsoft.Extensions.DependencyInjection;

public static class CapOptionsExtensions
{
// ReSharper disable once InconsistentNaming
public static CapOptions UseRabbitMQ(this CapOptions options, string hostName)
{
return options.UseRabbitMQ(opt => { opt.HostName = hostName; });
}

// ReSharper disable once InconsistentNaming
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new RabbitMQCapOptionsExtension(configure));
options.RegisterExtension(new RabbitMqCapOptionsExtension(configure));

return options;
}
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RabbitMQOptions

/// <summary>
/// The host to connect to.
/// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
/// If you want to connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
/// </summary>
public string HostName { get; set; } = "localhost";

Expand Down Expand Up @@ -104,7 +104,7 @@ public Func<BasicDeliverEventArgs, IServiceProvider, List<KeyValuePair<string, s
/// <summary>
/// Specify quality of service.
/// <br /><br />
/// This settings requests a specific quality of service.The QoS can be specified for the current channel or for all
/// These settings request a specific quality of service.The QoS can be specified for the current channel or for all
/// channels on the connection.<br />
/// The particular properties and semantics of a qos method always depend on the content class semantics.<br />
/// Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.<br />
Expand All @@ -121,7 +121,7 @@ public class QueueArgumentsOptions
/// <summary>
/// Gets or sets queue mode by supplying the 'x-queue-mode' declaration argument with a string specifying the desired mode.
/// </summary>
public string QueueMode { get; set; } = default!;
public string QueueMode { get; set; } = null!;

/// <summary>
/// Gets or sets queue message automatic deletion time (in milliseconds) "x-message-ttl", Default 864000000 ms (10 days).
Expand All @@ -132,7 +132,7 @@ public class QueueArgumentsOptions
/// <summary>
/// Gets or sets queue type by supplying the 'x-queue-type' declaration argument with a string specifying the desired type.
/// </summary>
public string QueueType { get; set; } = default!;
public string QueueType { get; set; } = null!;
}

public class BasicQos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP;

internal sealed class RabbitMQCapOptionsExtension : ICapOptionsExtension
internal sealed class RabbitMqCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<RabbitMQOptions> _configure;

public RabbitMQCapOptionsExtension(Action<RabbitMQOptions> configure)
public RabbitMqCapOptionsExtension(Action<RabbitMQOptions> configure)
{
_configure = configure;
}
Expand All @@ -23,8 +23,8 @@ public void AddServices(IServiceCollection services)
services.AddSingleton(new CapMessageQueueMakerService("RabbitMQ"));

services.Configure(_configure);
services.AddSingleton<ITransport, RabbitMQTransport>();
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<ITransport, RabbitMqTransport>();
services.AddSingleton<IConsumerClientFactory, RabbitMqConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
}
}
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
29 changes: 14 additions & 15 deletions src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
Expand All @@ -17,10 +18,10 @@ public class ConnectionChannelPool : IConnectionChannelPool, IDisposable
private const int DefaultPoolSize = 15;
private static readonly object SLock = new();

private readonly Func<IConnection> _connectionActivator;
private readonly Func<Task<IConnection>> _connectionActivator;
private readonly bool _isPublishConfirms;
private readonly ILogger<ConnectionChannelPool> _logger;
private readonly ConcurrentQueue<IModel> _pool;
private readonly ConcurrentQueue<IChannel> _pool;
private IConnection? _connection;

private int _count;
Expand All @@ -33,7 +34,7 @@ public ConnectionChannelPool(
{
_logger = logger;
_maxSize = DefaultPoolSize;
_pool = new ConcurrentQueue<IModel>();
_pool = new ConcurrentQueue<IChannel>();

var capOptions = capOptionsAccessor.Value;
var options = optionsAccessor.Value;
Expand All @@ -48,7 +49,7 @@ public ConnectionChannelPool(
$"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, VirtualHost:{options.VirtualHost}, ExchangeName:{options.ExchangeName}'");
}

IModel IConnectionChannelPool.Rent()
Task<IChannel> IConnectionChannelPool.Rent()
{
lock (SLock)
{
Expand All @@ -61,7 +62,7 @@ IModel IConnectionChannelPool.Rent()
}
}

bool IConnectionChannelPool.Return(IModel connection)
bool IConnectionChannelPool.Return(IChannel connection)
{
return Return(connection);
}
Expand All @@ -77,7 +78,7 @@ public IConnection GetConnection()
if (_connection != null && _connection.IsOpen) return _connection;

_connection?.Dispose();
_connection = _connectionActivator();
_connection = _connectionActivator().GetAwaiter().GetResult();
return _connection;
}
}
Expand All @@ -94,31 +95,30 @@ public void Dispose()
_connection?.Dispose();
}

private static Func<IConnection> CreateConnection(RabbitMQOptions options)
private static Func<Task<IConnection>> CreateConnection(RabbitMQOptions options)
{
var factory = new ConnectionFactory
{
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
VirtualHost = options.VirtualHost,
DispatchConsumersAsync = true,
ClientProvidedName = Assembly.GetEntryAssembly()?.GetName().Name!.ToLower()
};

if (options.HostName.Contains(","))
{
options.ConnectionFactoryOptions?.Invoke(factory);

return () => factory.CreateConnection(AmqpTcpEndpoint.ParseMultiple(options.HostName));
return () => factory.CreateConnectionAsync(AmqpTcpEndpoint.ParseMultiple(options.HostName));
}

factory.HostName = options.HostName;
options.ConnectionFactoryOptions?.Invoke(factory);
return () => factory.CreateConnection();
return () => factory.CreateConnectionAsync();
}

public virtual IModel Rent()
public virtual async Task<IChannel> Rent()
{
if (_pool.TryDequeue(out var model))
{
Expand All @@ -131,9 +131,8 @@ public virtual IModel Rent()

try
{
model = GetConnection().CreateModel();
model.ExchangeDeclare(Exchange, RabbitMQOptions.ExchangeType, true);
if (_isPublishConfirms) model.ConfirmSelect();
model = await GetConnection().CreateChannelAsync(new CreateChannelOptions(_isPublishConfirms, false));
await model.ExchangeDeclareAsync(Exchange, RabbitMQOptions.ExchangeType, true);
}
catch (Exception e)
{
Expand All @@ -145,7 +144,7 @@ public virtual IModel Rent()
return model;
}

public virtual bool Return(IModel channel)
public virtual bool Return(IChannel channel)
{
if (Interlocked.Increment(ref _count) <= _maxSize && channel.IsOpen)
{
Expand Down
5 changes: 3 additions & 2 deletions src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;
using RabbitMQ.Client;

namespace DotNetCore.CAP.RabbitMQ;
Expand All @@ -13,7 +14,7 @@ public interface IConnectionChannelPool

IConnection GetConnection();

IModel Rent();
Task<IChannel> Rent();

bool Return(IModel context);
bool Return(IChannel context);
}
32 changes: 14 additions & 18 deletions src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@

namespace DotNetCore.CAP.RabbitMQ;

internal sealed class RabbitMQTransport : ITransport
internal sealed class RabbitMqTransport : ITransport
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly string _exchange;
private readonly ILogger _logger;

public RabbitMQTransport(
ILogger<RabbitMQTransport> logger,
IConnectionChannelPool connectionChannelPool)
public RabbitMqTransport(ILogger<RabbitMqTransport> logger, IConnectionChannelPool connectionChannelPool)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
Expand All @@ -29,26 +27,24 @@ public RabbitMQTransport(

public BrokerAddress BrokerAddress => new("RabbitMQ", _connectionChannelPool.HostAddress);

public Task<OperateResult> SendAsync(TransportMessage message)
public async Task<OperateResult> SendAsync(TransportMessage message)
{
IModel? channel = null;
IChannel? channel = null;
try
{
channel = _connectionChannelPool.Rent();
channel = await _connectionChannelPool.Rent();

var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object?)x.Value);

channel.BasicPublish(_exchange, message.GetName(), props, message.Body);
var props = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent,
Headers = message.Headers.ToDictionary(x => x.Key, object? (x) => x.Value)
};

// Enable publish confirms
if (channel.NextPublishSeqNo > 0) channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
await channel.BasicPublishAsync(_exchange, message.GetName(), false, props, message.Body);

_logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(),
message.GetId());
_logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(), message.GetId());

return Task.FromResult(OperateResult.Success);
return OperateResult.Success;
}
catch (Exception ex)
{
Expand All @@ -59,7 +55,7 @@ public Task<OperateResult> SendAsync(TransportMessage message)
Description = ex.Message
};

return Task.FromResult(OperateResult.Failed(wrapperEx, errors));
return OperateResult.Failed(wrapperEx, errors);
}
finally
{
Expand Down
Loading

0 comments on commit d3e90d8

Please sign in to comment.