From 2194ef53f738f50a1167f301d334d23170fc1c4c Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 29 Feb 2024 15:57:13 +0800 Subject: [PATCH 1/6] Support async transaction without use async method (#1493) * Fixes AsyncLocal issue to support start async transaction. (#1376, #1317, #1266) * Add support async transaction * Remove unused line. * Add support async transaction * Fixes MySql start transaction dbtransaction not await bug. * Mark CapTransactionHolder internal visiable. --- .../Controllers/ValuesController.cs | 49 ++----- .../ICapTransaction.MongoDB.cs | 29 +++- .../ICapTransaction.MySql.cs | 122 +++++++++++++---- .../ICapTransaction.PostgreSql.cs | 116 ++++++++++++---- .../ICapTransaction.SqlServer.cs | 129 +++++++++++++----- src/DotNetCore.CAP/ICapPublisher.cs | 2 +- src/DotNetCore.CAP/ICapTransaction.Base.cs | 5 + .../Internal/ICapPublisher.Default.cs | 18 ++- test/DotNetCore.CAP.Test/CAP.BuilderTest.cs | 3 +- 9 files changed, 336 insertions(+), 137 deletions(-) diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs index 7b2defcbf..c6caeb838 100644 --- a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs @@ -35,10 +35,9 @@ public async Task Stop([FromServices] IBootstrapper bootstrapper) } [Route("~/without/transaction")] - public async Task WithoutTransaction() + public async Task WithoutTransactionAsync() { - await _capBus.PublishAsync("sample.rabbitmq.test", DateTime.Now); - await _capBus.PublishAsync("sample.rabbitmq.test2", DateTime.Now); + await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now, cancellationToken: HttpContext.RequestAborted); return Ok(); } @@ -52,54 +51,36 @@ public async Task Delay(int delaySeconds) } [Route("~/adonet/transaction")] - public IActionResult AdonetWithTransaction() + public async Task AdonetWithTransaction() { using (var connection = new MySqlConnection(AppDbContext.ConnectionString)) { - using (var transaction = connection.BeginTransaction(_capBus, true)) - { - connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); - - _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); - } + using var transaction = await connection.BeginTransactionAsync(_capBus, true); + await connection.ExecuteAsync("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); + await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); } return Ok(); } [Route("~/ef/transaction")] - public IActionResult EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext) + public async Task EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext) { - using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) + using (var trans = await dbContext.Database.BeginTransactionAsync(_capBus, autoCommit: false)) { - dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); - - for (int i = 0; i < 1; i++) - { - _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); - } - - dbContext.SaveChanges(); - - trans.Commit(); + await dbContext.Persons.AddAsync(new Person() { Name = "ef.transaction" }); + await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now); + await dbContext.SaveChangesAsync(); + await trans.CommitAsync(); } return Ok(); } [NonAction] - [CapSubscribe("sample.rabbitmq.test")] - public void Subscriber(string content) + [CapSubscribe("sample.rabbitmq.mysql")] + public void Subscriber(DateTime time) { - Thread.Sleep(2000); - Console.WriteLine($"Consume time: {DateTime.Now} \r\n --> " + content); + Console.WriteLine("Publishing time:" + time); } - - //[NonAction] - //[CapSubscribe("sample.rabbitmq.test2", Group = "test2")] - //public void Subscriber2(DateTime content) - //{ - // Thread.Sleep(2000); - // Console.WriteLine($"Group2 Consume time: {DateTime.Now} \r\n --> " + content); - //} } } diff --git a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs index c60de67da..fef7b528c 100644 --- a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs @@ -85,12 +85,30 @@ public static IClientSessionHandle StartTransaction(this IMongoClient client, ICapPublisher publisher, bool autoCommit = false) { var clientSessionHandle = client.StartSession(); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit); return new CapMongoDbClientSessionHandle(capTrans); } + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// The . + /// The . + /// The of MongoDB transaction session object. + public static Task StartTransactionAsync(this IMongoClient client, + ICapPublisher publisher, bool autoCommit = false, ClientSessionOptions? options = null, CancellationToken cancellationToken = default) + { + var clientSessionHandle = client.StartSessionAsync(options, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + + var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit); + return Task.FromResult(new CapMongoDbClientSessionHandle(capTrans)); + } + /// /// Start the CAP transaction with a custom session handle /// @@ -102,9 +120,8 @@ public static IClientSessionHandle StartTransaction(this IMongoClient client, public static IClientSessionHandle StartTransaction(this IMongoClient _, IClientSessionHandle clientSessionHandle, ICapPublisher publisher, bool autoCommit = false) { - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit); return new CapMongoDbClientSessionHandle(capTrans); } } diff --git a/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs b/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs index eb2551f1b..19d093e24 100644 --- a/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs +++ b/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs @@ -91,29 +91,12 @@ public override void Dispose() { (DbTransaction as IDisposable)?.Dispose(); DbTransaction = null; + GC.SuppressFinalize(this); } } public static class CapTransactionExtensions { - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbContextTransaction dbTransaction, bool autoCommit = false) - { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; - - return transaction; - } - - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbTransaction dbTransaction, bool autoCommit = false) - { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; - - return transaction; - } - /// /// Start the CAP transaction /// @@ -124,10 +107,7 @@ public static ICapTransaction Begin(this ICapTransaction transaction, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, ICapPublisher publisher, bool autoCommit = false) { - var trans = database.BeginTransaction(); - publisher.Transaction.Value = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit); } /// @@ -141,10 +121,44 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) { - var trans = database.BeginTransaction(isolationLevel); - publisher.Transaction.Value = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + var dbTransaction = database.BeginTransaction(isolationLevel); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return new CapEFDbTransaction(publisher.Transaction); + } + + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } + + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// The to use + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + var dbTransaction = database.BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return Task.FromResult(new CapEFDbTransaction(publisher.Transaction)); } /// @@ -156,11 +170,61 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas /// The object. public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false) + { + return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit); + } + + /// + /// Start the CAP transaction + /// + /// The . + /// The to use + /// The . + /// Whether the transaction is automatically committed when the message is published + /// The object. + public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) { if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); - var dbTransaction = dbConnection.BeginTransaction(); - publisher.Transaction.Value = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - return publisher.Transaction.Value.Begin(dbTransaction, autoCommit); + var dbTransaction = dbConnection.BeginTransaction(isolationLevel); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return publisher.Transaction; + } + + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + /// The object. + public static ValueTask BeginTransactionAsync(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } + + /// + /// Start the CAP transaction + /// + /// The . + /// The to use + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + public static ValueTask BeginTransactionAsync(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + if (dbConnection.State == ConnectionState.Closed) ((DbConnection)dbConnection).OpenAsync(cancellationToken).GetAwaiter().GetResult(); + + var dbTransaction = ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return ValueTask.FromResult(publisher.Transaction); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs index bea3e2b1c..a00b51af2 100644 --- a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs @@ -90,27 +90,42 @@ public override void Dispose() { (DbTransaction as IDisposable)?.Dispose(); DbTransaction = null; + GC.SuppressFinalize(this); } } public static class CapTransactionExtensions { - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbTransaction dbTransaction, bool autoCommit = false) + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false) { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; - - return transaction; + return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit); } - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbContextTransaction dbTransaction, bool autoCommit = false) + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; + if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); + var dbTransaction = dbConnection.BeginTransaction(isolationLevel); + + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; - return transaction; + return publisher.Transaction; } /// @@ -119,16 +134,32 @@ public static ICapTransaction Begin(this ICapTransaction transaction, /// The . /// The . /// Whether the transaction is automatically committed when the message is published - /// The object. - public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, - ICapPublisher publisher, bool autoCommit = false) + /// + public static ValueTask BeginTransactionAsync(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) { - if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); + return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } - var dbTransaction = dbConnection.BeginTransaction(); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - return publisher.Transaction.Value.Begin(dbTransaction, autoCommit); + /// + /// Start the CAP transaction + /// + /// The . + /// + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + public static ValueTask BeginTransactionAsync(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + if (dbConnection.State == ConnectionState.Closed) ((DbConnection)dbConnection).OpenAsync(cancellationToken).GetAwaiter().GetResult(); + var dbTransaction = ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken).AsTask().GetAwaiter().GetResult(); + + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + + return ValueTask.FromResult(publisher.Transaction); } /// @@ -141,11 +172,7 @@ public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, ICapPublisher publisher, bool autoCommit = false) { - var trans = database.BeginTransaction(); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit); } /// @@ -160,9 +187,42 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) { var trans = database.BeginTransaction(isolationLevel); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = trans; + publisher.Transaction.AutoCommit = autoCommit; + return new CapEFDbTransaction(publisher.Transaction); + } + + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } + + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// The to use + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + var trans = database.BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = trans; + publisher.Transaction.AutoCommit = autoCommit; + return Task.FromResult(new CapEFDbTransaction(publisher.Transaction)); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs index 8f1a08a43..38e36ef18 100644 --- a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs @@ -137,22 +137,68 @@ public override void Dispose() public static class CapTransactionExtensions { - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbTransaction dbTransaction, bool autoCommit = false) + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// The of EF DbContext transaction object. + public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, + ICapPublisher publisher, bool autoCommit = false) { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; + return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit); + } - return transaction; + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// The to use + /// Whether the transaction is automatically committed when the message is published + /// The of EF DbContext transaction object. + public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) + { + var dbTransaction = database.BeginTransaction(isolationLevel); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return new CapEFDbTransaction(publisher.Transaction); } - public static ICapTransaction Begin(this ICapTransaction transaction, - IDbContextTransaction dbTransaction, bool autoCommit = false) + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) { - transaction.DbTransaction = dbTransaction; - transaction.AutoCommit = autoCommit; + return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } - return transaction; + /// + /// Start the CAP transaction async + /// + /// The . + /// The . + /// The to use + /// Whether the transaction is automatically committed when the message is published + /// + /// The of EF DbContext transaction object. + public static Task BeginTransactionAsync(this DatabaseFacade database, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + var dbTransaction = database.BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return Task.FromResult(new CapEFDbTransaction(publisher.Transaction)); } /// @@ -165,46 +211,61 @@ public static ICapTransaction Begin(this ICapTransaction transaction, public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false) { - if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); - var dbTransaction = dbConnection.BeginTransaction(); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit); - return (IDbTransaction)capTransaction.DbTransaction!; + return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit); } /// /// Start the CAP transaction /// - /// The . + /// The . + /// The to use /// The . /// Whether the transaction is automatically committed when the message is published - /// The of EF DbContext transaction object. - public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, - ICapPublisher publisher, bool autoCommit = false) + /// The object. + public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) { - var trans = database.BeginTransaction(); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); + + var dbTransaction = dbConnection.BeginTransaction(isolationLevel); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return dbTransaction; } /// /// Start the CAP transaction /// - /// The . + /// The . /// The . + /// Whether the transaction is automatically committed when the message is published + /// + /// The object. + public static Task BeginTransactionAsync(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) + { + return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken); + } + + /// + /// Start the CAP transaction + /// + /// The . /// The to use + /// The . /// Whether the transaction is automatically committed when the message is published - /// The of EF DbContext transaction object. - public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, - IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false) + /// + /// The object. + public static Task BeginTransactionAsync(this IDbConnection dbConnection, + IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default) { - var trans = database.BeginTransaction(isolationLevel); - publisher.Transaction.Value = - ActivatorUtilities.CreateInstance(publisher.ServiceProvider); - var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); - return new CapEFDbTransaction(capTrans); + if (dbConnection.State == ConnectionState.Closed) ((DbConnection)dbConnection).OpenAsync(cancellationToken).GetAwaiter().GetResult(); + + var dbTransaction = ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult(); + publisher.Transaction = ActivatorUtilities.CreateInstance(publisher.ServiceProvider); + publisher.Transaction.DbTransaction = dbTransaction; + publisher.Transaction.AutoCommit = autoCommit; + return Task.FromResult(dbTransaction); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index e850128b1..db0bbd248 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -18,7 +18,7 @@ public interface ICapPublisher /// /// CAP transaction context object /// - AsyncLocal Transaction { get; } + ICapTransaction? Transaction { get; set; } /// /// Asynchronous publish an object message. diff --git a/src/DotNetCore.CAP/ICapTransaction.Base.cs b/src/DotNetCore.CAP/ICapTransaction.Base.cs index f13af004f..09359e678 100644 --- a/src/DotNetCore.CAP/ICapTransaction.Base.cs +++ b/src/DotNetCore.CAP/ICapTransaction.Base.cs @@ -11,6 +11,11 @@ namespace DotNetCore.CAP; +internal sealed class CapTransactionHolder +{ + public ICapTransaction? Transaction; +} + public abstract class CapTransactionBase : ICapTransaction { private readonly ConcurrentQueue _bufferList; diff --git a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs index ad87ce9be..8a7ec5aa7 100644 --- a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs @@ -27,6 +27,8 @@ internal class CapPublisher : ICapPublisher private readonly IDataStorage _storage; private readonly IBootstrapper _bootstrapper; + private readonly AsyncLocal _asyncLocal; + public CapPublisher(IServiceProvider service) { ServiceProvider = service; @@ -35,12 +37,20 @@ public CapPublisher(IServiceProvider service) _storage = service.GetRequiredService(); _capOptions = service.GetRequiredService>().Value; _snowflakeId = service.GetRequiredService(); - Transaction = new AsyncLocal(); + _asyncLocal = new AsyncLocal(); } public IServiceProvider ServiceProvider { get; } - public AsyncLocal Transaction { get; } + public ICapTransaction? Transaction { + + get => _asyncLocal.Value?.Transaction; + set + { + _asyncLocal.Value ??= new CapTransactionHolder(); + _asyncLocal.Value.Transaction = value; + } + } public async Task PublishAsync(string name, T? value, IDictionary headers, CancellationToken cancellationToken = default) @@ -146,7 +156,7 @@ private async Task PublishInternalAsync(string name, T? value, IDictionary(string name, T? value, IDictionary Transaction { get; } + public ICapTransaction Transaction { get; set; } public Task PublishAsync(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default(CancellationToken)) From dac69cf95f3543897bb497a256ad5c76e56a297c Mon Sep 17 00:00:00 2001 From: Dmytro Rakhmanov Date: Wed, 6 Mar 2024 15:44:04 +0200 Subject: [PATCH 2/6] Adding default correlation headers to configure additional message correlation for Azure Service Bus. (#1497) * Adding option to disable automation message correlation * Added correlation properties collection --- .../en/transport/azure-service-bus.md | 23 ++++++++++--------- .../AzureServiceBusConsumerClient.cs | 11 +++++++-- .../CAP.AzureServiceBusOptions.cs | 6 +++++ 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/docs/content/user-guide/en/transport/azure-service-bus.md b/docs/content/user-guide/en/transport/azure-service-bus.md index 5ff71baa7..4d1f16e0c 100644 --- a/docs/content/user-guide/en/transport/azure-service-bus.md +++ b/docs/content/user-guide/en/transport/azure-service-bus.md @@ -39,17 +39,18 @@ public void ConfigureServices(IServiceCollection services) The AzureServiceBus configuration options provided directly by the CAP: -| NAME | DESCRIPTION | TYPE | DEFAULT | -| :---------------------- | :------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------- | :---------------- | -| ConnectionString | Endpoint address | string | | -| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false | -| TopicPath | Topic entity path | string | cap | -| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue | -| ManagementTokenProvider | Token provider | ITokenProvider | null | -| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false | -| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func>>?` | null | -| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null | -| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List> | null | +| NAME | DESCRIPTION | TYPE | DEFAULT | +|:------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|:----------------------------------| +| ConnectionString | Endpoint address | string | | +| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false | +| TopicPath | Topic entity path | string | cap | +| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue | +| ManagementTokenProvider | Token provider | ITokenProvider | null | +| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false | +| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func>>?` | null | +| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null | +| DefaultCorrelationHeaders | Adds additional correlation properties to all [correlation filters](https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters). | IDictionary | Dictionary.Empty | +| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List> | null | #### Sessions diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index 4875a5610..71b2cd4ba 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -54,10 +54,10 @@ public void Subscribe(IEnumerable topics) ConnectAsync().GetAwaiter().GetResult(); topics = topics.Concat(_asbOptions!.SQLFilters?.Select(o => o.Key) ?? Enumerable.Empty()); + var allRules = _administrationClient!.GetRulesAsync(_asbOptions!.TopicPath, _subscriptionName).ToEnumerable(); var allRuleNames = allRules.Select(o => o.Name); - foreach (var newRule in topics.Except(allRuleNames)) { var isSqlRule = _asbOptions.SQLFilters?.FirstOrDefault(o => o.Key == newRule).Value is not null; @@ -71,10 +71,17 @@ public void Subscribe(IEnumerable topics) } else { - currentRuleToAdd = new CorrelationRuleFilter + var correlationRule = new CorrelationRuleFilter { Subject = newRule }; + + foreach (var correlationHeader in _asbOptions.DefaultCorrelationHeaders) + { + correlationRule.ApplicationProperties.Add(correlationHeader.Key, correlationHeader.Value); + } + + currentRuleToAdd = correlationRule; } _administrationClient.CreateRuleAsync(_asbOptions.TopicPath, _subscriptionName, diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs index 38aeabbd7..0e7eedd9d 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -55,6 +55,12 @@ public class AzureServiceBusOptions /// public bool AutoCompleteMessages { get; set; } + /// + /// Adds additional correlation properties to all correlation filters. + /// https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters + /// + public IDictionary DefaultCorrelationHeaders { get; } = new Dictionary(); + /// /// Gets the maximum number of concurrent calls to the ProcessMessageAsync message handler the processor should initiate. /// From f0646a3f1067939d4ddcd221e1c0e773be2da781 Mon Sep 17 00:00:00 2001 From: Jon Ekdahl Date: Thu, 7 Mar 2024 03:08:26 +0100 Subject: [PATCH 3/6] Allow configuring PostgreSQL using an Npgsql data source (#1496) * Allow configuring the Npgsql data source directly Add an optional DataSource property in the PostgreSqlOptions. Keep ConnectionString for backwards compatibility, removing it is a breaking change that would require a major version bump. * Add DataSource property in PostgreSQL documentation * Make CreateConnection helper internal * Mark PostgreSqlOptions.ConnectionString as obsolete --------- Co-authored-by: Jon Ekdahl --- .../user-guide/en/storage/postgresql.md | 9 +++++---- .../CAP.PostgreSqlOptions.cs | 15 ++++++++++++++ .../IDataStorage.PostgreSql.cs | 20 +++++++++---------- .../IMonitoringApi.PostgreSql.cs | 10 +++++----- .../IStorageInitializer.PostgreSql.cs | 2 +- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/docs/content/user-guide/en/storage/postgresql.md b/docs/content/user-guide/en/storage/postgresql.md index 7835b7f3a..04a47fd18 100644 --- a/docs/content/user-guide/en/storage/postgresql.md +++ b/docs/content/user-guide/en/storage/postgresql.md @@ -32,10 +32,11 @@ public void ConfigureServices(IServiceCollection services) #### PostgreSqlOptions -NAME | DESCRIPTION | TYPE | DEFAULT -:---|:---|---|:--- -Schema | Database schema | string | cap -ConnectionString | Database connection string | string | +NAME | DESCRIPTION | TYPE | DEFAULT +:---|:---------------------------|----------------------|:--- +Schema | Database schema | string | cap +ConnectionString | Database connection string | string | +DataSource | [Data source](https://www.npgsql.org/doc/basic-usage.html#data-source) | [NpgsqlDataSource](https://www.npgsql.org/doc/api/Npgsql.NpgsqlDataSource.html) | ## Publish with transaction diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs index 225d2aa07..10b735f80 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -6,6 +6,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using Npgsql; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP; @@ -15,7 +16,21 @@ public class PostgreSqlOptions : EFOptions /// /// Gets or sets the database's connection string that will be used to store database entities. /// + [Obsolete("Use .DataSource = NpgsqlDataSource.Create() for same behavior.")] public string ConnectionString { get; set; } = default!; + + /// + /// Gets or sets the Npgsql data source that will be used to store database entities. + /// + public NpgsqlDataSource? DataSource { get; set; } + + /// + /// Creates an Npgsql connection from the configured data source. + /// + internal NpgsqlConnection CreateConnection() + { + return DataSource != null ? DataSource.CreateConnection() : new NpgsqlConnection(ConnectionString); + } } internal class ConfigurePostgreSqlOptions : IConfigureOptions diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 43d0b879c..8a62956c1 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -50,7 +50,7 @@ public async Task AcquireLockAsync(string key, TimeSpan ttl, string instan { var sql = $"UPDATE {_lockName} SET \"Instance\"=@Instance,\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"LastLockTime\" < @TTL;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -67,7 +67,7 @@ public async Task ReleaseLockAsync(string key, string instance, CancellationToke { var sql = $"UPDATE {_lockName} SET \"Instance\"='',\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"Instance\"=@Instance;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -82,7 +82,7 @@ public async Task RenewLockAsync(string key, TimeSpan ttl, string instance, Canc { var sql = $"UPDATE {_lockName} SET \"LastLockTime\"=\"LastLockTime\"+interval '{ttl.TotalSeconds}' second WHERE \"Key\"=@Key AND \"Instance\"=@Instance;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -96,7 +96,7 @@ public async Task ChangePublishStateToDelayedAsync(string[] ids) { var sql = $"UPDATE {_pubName} SET \"StatusName\"='{StatusName.Delayed}' WHERE \"Id\" IN ({string.Join(',', ids)});"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false); } @@ -140,7 +140,7 @@ public async Task StoreMessageAsync(string name, Message content, if (transaction == null) { - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -205,7 +205,7 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( $"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND (\"StatusName\"='{StatusName.Succeeded}' OR \"StatusName\"='{StatusName.Failed}') LIMIT @batchCount);", @@ -238,7 +238,7 @@ public async Task ScheduleMessagesOfDelayedAsync(Func @@ -296,7 +296,7 @@ private async Task ChangeMessageStateAsync(string tableName, MediumMessage messa } else { - await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await using var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -308,7 +308,7 @@ private async Task StoreReceivedMessage(object[] sqlParams) $"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -327,7 +327,7 @@ private async Task> GetMessagesOfNeedRetryAsync(strin new NpgsqlParameter("@Added", fourMinAgo) }; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var result = await connection.ExecuteReaderAsync(sql, async reader => { diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index 5ef6a9287..e7c994508 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -61,7 +61,7 @@ SELECT COUNT(""Id"") FROM {_recName} WHERE ""StatusName"" = N'Failed' SELECT COUNT(""Id"") FROM {_pubName} WHERE ""StatusName"" = N'Delayed' ) AS ""PublishedDelayed"";"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var statistics = await connection.ExecuteReaderAsync(sql, async reader => { @@ -98,7 +98,7 @@ public async Task> GetMessagesAsync(MessageQueryDto var sqlQuery = $"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var count = await connection.ExecuteScalarAsync($"select count(1) from {tableName} where 1=1 {where}", @@ -182,7 +182,7 @@ private async ValueTask GetNumberOfMessage(string tableName, string statusN var sqlQuery = $"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteScalarAsync(sqlQuery, new NpgsqlParameter("@state", statusName)) .ConfigureAwait(false); @@ -227,7 +227,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH') }; Dictionary valuesMap; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using (connection.ConfigureAwait(false)) { valuesMap = await connection.ExecuteReaderAsync(sqlQuery, async reader => @@ -264,7 +264,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH') var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var mediumMessage = await connection.ExecuteReaderAsync(sql, async reader => { diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index 2fd25aba9..580a26c37 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -46,7 +46,7 @@ public async Task InitializeAsync(CancellationToken cancellationToken) if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(_options.Value.Schema); - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { From ed4a8410ec24615565e34163698d042466d2a7ee Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 14 Mar 2024 22:32:05 +0800 Subject: [PATCH 4/6] Add support NATS configure consumer options. (#1500) --- docs/content/user-guide/en/transport/nats.md | 4 +++- docs/content/user-guide/zh/transport/nats.md | 5 +++-- src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs | 6 +----- src/DotNetCore.CAP.NATS/NATSConsumerClient.cs | 15 +++++++++------ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/docs/content/user-guide/en/transport/nats.md b/docs/content/user-guide/en/transport/nats.md index 0a837fb59..6d803f261 100644 --- a/docs/content/user-guide/en/transport/nats.md +++ b/docs/content/user-guide/en/transport/nats.md @@ -41,7 +41,9 @@ NAME | DESCRIPTION | TYPE | DEFAULT Options | NATS client configuration | Options | Options Servers | Server url/urls used to connect to the NATs server. | string | NULL ConnectionPoolSize | number of connections pool | uint | 10 -DeliverPolicy | The point in the stream to receive messages from | enum | DeliverPolicy.New +DeliverPolicy | The point in the stream to receive messages from (⚠️ Removed from version 8.1.0, use `ConsumerOptions` instead.) | enum | DeliverPolicy.New +StreamOptions | 🆕 Stream configuration | Action | NULL +ConsumerOptions | 🆕 Consumer configuration | Action | NULL #### NATS ConfigurationOptions diff --git a/docs/content/user-guide/zh/transport/nats.md b/docs/content/user-guide/zh/transport/nats.md index 9d549f075..0fdb95229 100644 --- a/docs/content/user-guide/zh/transport/nats.md +++ b/docs/content/user-guide/zh/transport/nats.md @@ -42,8 +42,9 @@ NAME | DESCRIPTION | TYPE | DEFAULT Options | NATS 客户端配置 | Options | Options Servers | 服务器Urls地址 | string | NULL ConnectionPoolSize | 连接池数量 | uint | 10 -DeliverPolicy | 消费消息的策略点 | enum | DeliverPolicy.New - +DeliverPolicy | 消费消息的策略点(⚠️在8.1.0版本移除,使用`ConsumerOptions`替代。) | enum | DeliverPolicy.New +StreamOptions | 🆕 Stream 配置项 | Action | NULL +ConsumerOptions | 🆕 Consumer 配置项 | Action | NULL #### NATS ConfigurationOptions diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs index b69a0e48f..f96a7e689 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -31,11 +31,7 @@ public class NATSOptions public Action? StreamOptions { get; set; } - /// - /// The point in the stream to receive messages from, - /// either DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequence, DeliverByStartTime, or DeliverLastPerSubject. - /// - public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.New; + public Action? ConsumerOptions { get; set; } public Func NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; } diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index 2f03684e5..e674278ab 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -97,14 +97,17 @@ public void Subscribe(IEnumerable topics) { try { + var consumerConfig = ConsumerConfiguration.Builder() + .WithDurable(Helper.Normalized(groupName + "-" + subject)) + .WithDeliverPolicy(DeliverPolicy.New) + .WithAckWait(30000) + .WithAckPolicy(AckPolicy.Explicit); + + _natsOptions.ConsumerOptions?.Invoke(consumerConfig); + var pso = PushSubscribeOptions.Builder() .WithStream(subjectStream.Key) - .WithConfiguration(ConsumerConfiguration.Builder() - .WithDurable(Helper.Normalized(groupName + "-" + subject)) - .WithDeliverPolicy(_natsOptions.DeliverPolicy) - .WithAckWait(10000) - .WithAckPolicy(AckPolicy.Explicit) - .Build()) + .WithConfiguration(consumerConfig.Build()) .Build(); js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); From 58449857082f70d63b0270f789e743a40aa3f2ed Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 14 Mar 2024 22:32:31 +0800 Subject: [PATCH 5/6] Update docs. --- docs/content/user-guide/en/cap/configuration.md | 9 ++++++++- docs/content/user-guide/en/cap/messaging.md | 6 +++--- docs/content/user-guide/en/monitoring/diagnostics.md | 4 ++-- docs/content/user-guide/en/transport/aws-sqs.md | 4 ++-- docs/content/user-guide/en/transport/kafka.md | 2 +- docs/content/user-guide/en/transport/rabbitmq.md | 2 +- docs/content/user-guide/zh/cap/configuration.md | 9 ++++++++- docs/content/user-guide/zh/cap/messaging.md | 6 +++--- docs/content/user-guide/zh/monitoring/diagnostics.md | 4 ++-- docs/content/user-guide/zh/storage/postgresql.md | 1 + docs/content/user-guide/zh/transport/aws-sqs.md | 4 ++-- docs/content/user-guide/zh/transport/rabbitmq.md | 2 +- docs/readme.md | 2 +- 13 files changed, 35 insertions(+), 20 deletions(-) diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md index adf26a37b..37b10398b 100644 --- a/docs/content/user-guide/en/cap/configuration.md +++ b/docs/content/user-guide/en/cap/configuration.md @@ -152,4 +152,11 @@ By default, CAP will only read one message from the message queue, then execute If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution. !!! note "Precautions" - Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again \ No newline at end of file + Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again + +#### EnablePublishParallelSend + +> Default: false, The (7.2 <= Version < 8.1) the default behavior is true + +By default, messages sent are first placed into the Channel in memory and then processed linearly. +If set to true, the task of sending messages will be processed in parallel by the .NET thread pool, which will greatly increase the speed of sending. \ No newline at end of file diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index ec49b9ef4..3ed251ccc 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -125,7 +125,7 @@ Retrying plays an important role in the overall CAP architecture design, CAP ret During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes (FallbackWindowLookbackSeconds), and +1 retry. When the total number of retries reaches 50, CAP will stop retrying. -You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions Or use [FailedThresholdCallback](../configuration#failedthresholdcallback) to receive notifications when the maximum retry count is reached. +You can adjust the total number of retries by setting [FailedRetryCount](configuration.md#failedretrycount) in CapOptions Or use [FailedThresholdCallback](configuration.md#failedthresholdcallback) to receive notifications when the maximum retry count is reached. It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry. @@ -141,10 +141,10 @@ Whether sending fails or consumption fails, we will store the exception message There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later. -Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later (You can use [FailedMessageExpiredAfter](../configuration#failedmessageexpiredafter) configuration items to custom). +Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later (You can use [FailedMessageExpiredAfter](configuration.md#failedmessageexpiredafter) configuration items to custom). By default, the data of the message in the table is deleted every **5 minutes** to avoid performance degradation caused by too much data. The cleanup strategy `ExpiresAt` is performed when field is not empty and is less than the current time. That is to say, the message with the status Failed (by default they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up. -You can use [CollectorCleaningInterval](../configuration#collectorcleaninginterval) configuration items to custom the interval time. +You can use [CollectorCleaningInterval](configuration.md#collectorcleaninginterval) configuration items to custom the interval time. diff --git a/docs/content/user-guide/en/monitoring/diagnostics.md b/docs/content/user-guide/en/monitoring/diagnostics.md index fc51b8f32..08f9d9d1c 100644 --- a/docs/content/user-guide/en/monitoring/diagnostics.md +++ b/docs/content/user-guide/en/monitoring/diagnostics.md @@ -71,13 +71,13 @@ dotnet-counters monitor --process-id=25496 --counters=DotNetCore.CAP.EventCounte process-id: The ID of the CAP process to collect counter data from. -![img](/img/dotnet-counters.gif) +![img](../../../img/dotnet-counters.gif) ### Monitor with dashboard You can configure `x.UseDashboard()` to open the dashboard to view Metrics graph charts. -![img](/img/dashboard-metrics.gif) +![img](../../../img/dashboard-metrics.gif) In the Realtime Metric Graph, the time axis will scroll in real time over time so that you can see the rate of publishing and consuming messages per second, And the consumer execution time is "dotted" on the Y1 axis (Y0 axis is the rates, and the Y1 axis is the execution elpsed time). diff --git a/docs/content/user-guide/en/transport/aws-sqs.md b/docs/content/user-guide/en/transport/aws-sqs.md index fbe158eda..a862d3ed2 100644 --- a/docs/content/user-guide/en/transport/aws-sqs.md +++ b/docs/content/user-guide/en/transport/aws-sqs.md @@ -32,7 +32,7 @@ public void TestBar(DateTime value) ``` After CAP startups, you will see in SNS management console: -![img](/img/aws-sns-demo.png) +![img](../../../img/aws-sns-demo.png) ### SQS @@ -40,7 +40,7 @@ For each consumer group, CAP will create a corresponding SQS queue, the name of The SQS queue will subscribe to Topic in SNS, as shown below: -![img](/img/aws-sns-demo.png) +![img](../../../img/aws-sns-demo.png) !!! warning "Precautions" Due to the limitation of AWS SNS, when you remove the subscription method, CAP will not delete topics or queues on AWS SNS or SQS, you need to delete them manually. diff --git a/docs/content/user-guide/en/transport/kafka.md b/docs/content/user-guide/en/transport/kafka.md index 08edecb4f..42f9bc289 100644 --- a/docs/content/user-guide/en/transport/kafka.md +++ b/docs/content/user-guide/en/transport/kafka.md @@ -46,7 +46,7 @@ CustomHeaders | Custom subscribe headers | Func<> | N/A When the message sent from a heterogeneous system, because of the CAP needs to define additional headers, so an exception will occur at this time. By providing this parameter to set the custom headersn to make the subscriber works. -You can find the description of heterogeneous system integration [here](../../cap/messaging#heterogeneous-system-integration). +You can find the description of heterogeneous system integration [here](../cap/messaging.md#heterogeneous-system-integration). Sometimes, if you want to get additional context information from Broker, you can also add it through this option. For example, add information such as Offset or Partition. diff --git a/docs/content/user-guide/en/transport/rabbitmq.md b/docs/content/user-guide/en/transport/rabbitmq.md index c6cb41e62..b66028ae7 100644 --- a/docs/content/user-guide/en/transport/rabbitmq.md +++ b/docs/content/user-guide/en/transport/rabbitmq.md @@ -74,7 +74,7 @@ services.AddCap(x => When the message sent from the RabbitMQ management console or a heterogeneous system, because of the CAP needs to define additional headers, so an exception will occur at this time. By providing this parameter to set the custom headersn to make the subscriber works. -You can find the description of [Header Information](../cap/messaging#heterogeneous-system-integration) here. +You can find the description of [Header Information](../cap/messaging.md#heterogeneous-system-integration) here. Example: diff --git a/docs/content/user-guide/zh/cap/configuration.md b/docs/content/user-guide/zh/cap/configuration.md index b7b50609c..280fb3b41 100644 --- a/docs/content/user-guide/zh/cap/configuration.md +++ b/docs/content/user-guide/zh/cap/configuration.md @@ -156,4 +156,11 @@ services.AddCap(config => 如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行。 !!! note "注意事项" - 设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行 \ No newline at end of file + 设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行 + +#### EnablePublishParallelSend + +> 默认值: false + +默认情况下,发送的消息都先放置到内存同一个Channel中,然后线性处理。 +如果设置为 true,则发送消息的任务将由.NET线程池并行处理,这会大大提高发送的速度。 \ No newline at end of file diff --git a/docs/content/user-guide/zh/cap/messaging.md b/docs/content/user-guide/zh/cap/messaging.md index 1385be036..171c55abb 100644 --- a/docs/content/user-guide/zh/cap/messaging.md +++ b/docs/content/user-guide/zh/cap/messaging.md @@ -116,7 +116,7 @@ CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 -你可以在 CapOptions 中设置 [FailedRetryCount](../configuration#failedretrycount) 来调整默认重试的总次数,或使用 [FailedThresholdCallback](../configuration#FailedThresholdCallback) 在达到最大重试次数时收到通知。 +你可以在 CapOptions 中设置 [FailedRetryCount](configuration.md#failedretrycount) 来调整默认重试的总次数,或使用 [FailedThresholdCallback](configuration.md#FailedThresholdCallback) 在达到最大重试次数时收到通知。 当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 @@ -128,7 +128,7 @@ CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 ## 消息数据清理 -数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期(可通过 [FailedMessageExpiredAfter](../configuration#failedmessageexpiredafter) 配置)。 +数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期(可通过 [FailedMessageExpiredAfter](configuration.md#failedmessageexpiredafter) 配置)。 -CAP 默认情况下会每隔**5分钟**将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。你可以通过 [CollectorCleaningInterval](../configuration#collectorcleaninginterval) 配置项来自定义间隔时间。 +CAP 默认情况下会每隔**5分钟**将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。你可以通过 [CollectorCleaningInterval](configuration.md#collectorcleaninginterval) 配置项来自定义间隔时间。 diff --git a/docs/content/user-guide/zh/monitoring/diagnostics.md b/docs/content/user-guide/zh/monitoring/diagnostics.md index 80945796c..df5b8aada 100644 --- a/docs/content/user-guide/zh/monitoring/diagnostics.md +++ b/docs/content/user-guide/zh/monitoring/diagnostics.md @@ -65,13 +65,13 @@ dotnet-counters monitor --process-id=25496 --counters=DotNetCore.CAP.EventCounte 其中 process-id 为 CAP 所属的进程Id。 -![img](/img/dotnet-counters.gif) +![img](../../../img/dotnet-counters.gif) ### 在 Dashboard 中查看度量 你可以配置 `x.UseDashboard()` 来开启仪表盘以图表的形式查看 Metrics 指标。 如下图: -![img](/img/dashboard-metrics.gif) +![img](../../../img/dashboard-metrics.gif) 在 Realtime Metric Graph 中,时间轴会随着时间实时滚动从而可以看到发布和消费消息每秒的速率,同时我们可以看到消费者执行耗时以“打点”的方式在 Y1 轴上(Y0轴为速率,Y1轴为执行耗时)。 diff --git a/docs/content/user-guide/zh/storage/postgresql.md b/docs/content/user-guide/zh/storage/postgresql.md index 3a10416d0..ff891e55d 100644 --- a/docs/content/user-guide/zh/storage/postgresql.md +++ b/docs/content/user-guide/zh/storage/postgresql.md @@ -37,6 +37,7 @@ NAME | DESCRIPTION | TYPE | DEFAULT :---|:---|---|:--- Schema | 数据库架构 | string | cap ConnectionString | 数据库连接字符串 | string | +DataSource | [Data source](https://www.npgsql.org/doc/basic-usage.html#data-source) | [NpgsqlDataSource](https://www.npgsql.org/doc/api/Npgsql.NpgsqlDataSource.html) | ### 自定义表名称 diff --git a/docs/content/user-guide/zh/transport/aws-sqs.md b/docs/content/user-guide/zh/transport/aws-sqs.md index f60a0f185..88b1bd4e7 100644 --- a/docs/content/user-guide/zh/transport/aws-sqs.md +++ b/docs/content/user-guide/zh/transport/aws-sqs.md @@ -33,7 +33,7 @@ public void TestBar(DateTime value) 在 CAP 启动后,在 AWS SNS 中你将看到 -![img](/img/aws-sns-demo.png) +![img](../../../img/aws-sns-demo.png) ### SQS @@ -41,7 +41,7 @@ public void TestBar(DateTime value) 该 SQS 队列将订阅 SNS 中的 Topic ,如下图: -![img](/img/aws-sns-demo.png) +![img](../../../img/aws-sns-demo.png) !!! warning "注意事项" 由于 AWS SNS 的限制,当你减少订阅方法时,我们不会主动删除 AWS SNS 或者 SQS 上的相关 Topic 或 Queue,你需要手动删除他们。 diff --git a/docs/content/user-guide/zh/transport/rabbitmq.md b/docs/content/user-guide/zh/transport/rabbitmq.md index 5a8aeda8d..8b8500102 100644 --- a/docs/content/user-guide/zh/transport/rabbitmq.md +++ b/docs/content/user-guide/zh/transport/rabbitmq.md @@ -75,7 +75,7 @@ services.AddCap(x => 当需要从异构系统或者直接接收从RabbitMQ 控制台发送的消息时,由于 CAP 需要定义额外的头信息才能正常订阅,所以此时会出现异常。通过提供此参数来进行自定义头信息的设置来使订阅者正常工作。 -你可以在这里找到有关 [头信息](../cap/messaging#异构系统集成) 的说明。 +你可以在这里找到有关 [头信息](../cap/messaging.md#异构系统集成) 的说明。 用法如下: diff --git a/docs/readme.md b/docs/readme.md index 5ba32207c..4110fd23e 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -14,5 +14,5 @@ Web site made with [Material for MkDocs](https://squidfunk.github.io/mkdocs-mate ``` cd CAP/docs -docker run --rm -it -p 8000:8000 -v ${PWD}:/docs squidfunk/mkdocs-material +docker run --rm -it -p 8000:8000 -v ${PWD}:/docs squidfunk/mkdocs-material ``` \ No newline at end of file From 66470a69c374f017d3f858a41163c4ddbf120df4 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 14 Mar 2024 22:33:38 +0800 Subject: [PATCH 6/6] Update package reference to .net 8 --- samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj b/samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj index e0f3df780..54ddd774c 100644 --- a/samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj +++ b/samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj @@ -6,8 +6,8 @@ - - + +