Skip to content

Commit

Permalink
Support async transaction without use async method (#1493)
Browse files Browse the repository at this point in the history
* Fixes AsyncLocal issue to support  start async transaction. (#1376, #1317, #1266)

* Add support async transaction

* Remove unused line.

* Add support async transaction

* Fixes MySql start transaction dbtransaction not await bug.

* Mark CapTransactionHolder internal visiable.
  • Loading branch information
yang-xiaodong committed Feb 29, 2024
1 parent f077529 commit 2194ef5
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 137 deletions.
49 changes: 15 additions & 34 deletions samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ public async Task<IActionResult> Stop([FromServices] IBootstrapper bootstrapper)
}

[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
public async Task<IActionResult> WithoutTransactionAsync()
{
await _capBus.PublishAsync("sample.rabbitmq.test", DateTime.Now);
await _capBus.PublishAsync("sample.rabbitmq.test2", DateTime.Now);
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now, cancellationToken: HttpContext.RequestAborted);

return Ok();
}
Expand All @@ -52,54 +51,36 @@ public async Task<IActionResult> Delay(int delaySeconds)
}

[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
public async Task<IActionResult> AdonetWithTransaction()
{
using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, true))
{
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);

_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
using var transaction = await connection.BeginTransactionAsync(_capBus, true);
await connection.ExecuteAsync("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
}

return Ok();
}

[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext)
public async Task<IActionResult> EntityFrameworkWithTransaction([FromServices] AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
using (var trans = await dbContext.Database.BeginTransactionAsync(_capBus, autoCommit: false))
{
dbContext.Persons.Add(new Person() { Name = "ef.transaction" });

for (int i = 0; i < 1; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}

dbContext.SaveChanges();

trans.Commit();
await dbContext.Persons.AddAsync(new Person() { Name = "ef.transaction" });
await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
await dbContext.SaveChangesAsync();
await trans.CommitAsync();
}
return Ok();
}

[NonAction]
[CapSubscribe("sample.rabbitmq.test")]
public void Subscriber(string content)
[CapSubscribe("sample.rabbitmq.mysql")]
public void Subscriber(DateTime time)
{
Thread.Sleep(2000);
Console.WriteLine($"Consume time: {DateTime.Now} \r\n --> " + content);
Console.WriteLine("Publishing time:" + time);
}

//[NonAction]
//[CapSubscribe("sample.rabbitmq.test2", Group = "test2")]
//public void Subscriber2(DateTime content)
//{
// Thread.Sleep(2000);
// Console.WriteLine($"Group2 Consume time: {DateTime.Now} \r\n --> " + content);
//}
}
}
29 changes: 23 additions & 6 deletions src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,30 @@ public static IClientSessionHandle StartTransaction(this IMongoClient client,
ICapPublisher publisher, bool autoCommit = false)
{
var clientSessionHandle = client.StartSession();
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit);
publisher.Transaction = ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit);
return new CapMongoDbClientSessionHandle(capTrans);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="client">The <see cref="IMongoClient" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="options">The <see cref="ClientSessionOptions"/>.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns>The <see cref="IClientSessionHandle" /> of MongoDB transaction session object.</returns>
public static Task<IClientSessionHandle> StartTransactionAsync(this IMongoClient client,
ICapPublisher publisher, bool autoCommit = false, ClientSessionOptions? options = null, CancellationToken cancellationToken = default)
{
var clientSessionHandle = client.StartSessionAsync(options, cancellationToken).GetAwaiter().GetResult();
publisher.Transaction = ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);

var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit);
return Task.FromResult<IClientSessionHandle>(new CapMongoDbClientSessionHandle(capTrans));
}

/// <summary>
/// Start the CAP transaction with a custom session handle
/// </summary>
Expand All @@ -102,9 +120,8 @@ public static IClientSessionHandle StartTransaction(this IMongoClient client,
public static IClientSessionHandle StartTransaction(this IMongoClient _, IClientSessionHandle clientSessionHandle,
ICapPublisher publisher, bool autoCommit = false)
{
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit);
publisher.Transaction = ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit);
return new CapMongoDbClientSessionHandle(capTrans);
}
}
122 changes: 93 additions & 29 deletions src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,12 @@ public override void Dispose()
{
(DbTransaction as IDisposable)?.Dispose();
DbTransaction = null;
GC.SuppressFinalize(this);
}
}

public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
}

public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
}

/// <summary>
/// Start the CAP transaction
/// </summary>
Expand All @@ -124,10 +107,7 @@ public static ICapTransaction Begin(this ICapTransaction transaction,
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit);
}

/// <summary>
Expand All @@ -141,10 +121,44 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction(isolationLevel);
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
var dbTransaction = database.BeginTransaction(isolationLevel);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return new CapEFDbTransaction(publisher.Transaction);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
var dbTransaction = database.BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult();
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return Task.FromResult<IDbContextTransaction>(new CapEFDbTransaction(publisher.Transaction));
}

/// <summary>
Expand All @@ -156,11 +170,61 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();

var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
var dbTransaction = dbConnection.BeginTransaction(isolationLevel);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return publisher.Transaction;
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ValueTask<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
public static ValueTask<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
if (dbConnection.State == ConnectionState.Closed) ((DbConnection)dbConnection).OpenAsync(cancellationToken).GetAwaiter().GetResult();

var dbTransaction = ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken).GetAwaiter().GetResult();
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return ValueTask.FromResult(publisher.Transaction);
}
}
Loading

0 comments on commit 2194ef5

Please sign in to comment.