Skip to content

Commit

Permalink
Added FlushAsync function which is called from CommitAsync with a…
Browse files Browse the repository at this point in the history
…wait keyword (#1629)

* do not throw exception for existing cap-msg-group message header. overwrite it instead. tolerate kafka message duplicate headers

* added FlushAsync to possibly fix incorrect order of produced messages inside transaction

* call flushasync from flush
  • Loading branch information
PoteRii authored Jan 2, 2025
1 parent d582120 commit 8575109
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public override async Task CommitAsync(CancellationToken cancellationToken = def
if (DbTransaction is IClientSessionHandle session)
await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false);

Flush();
await FlushAsync();
}

public override void Rollback()
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override async Task CommitAsync(CancellationToken cancellationToken = def
break;
}

Flush();
await FlushAsync();
}

public override void Rollback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public override async Task CommitAsync(CancellationToken cancellationToken = def
break;
}

Flush();
await FlushAsync();
}

public override void Rollback()
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public override async Task CommitAsync(CancellationToken cancellationToken = def
switch (DbTransaction)
{
case NoopTransaction _:
Flush();
await FlushAsync();
break;
case DbTransaction dbTransaction:
await dbTransaction.CommitAsync(cancellationToken).ConfigureAwait(false);
Expand Down
11 changes: 7 additions & 4 deletions src/DotNetCore.CAP/ICapTransaction.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,27 @@ protected internal virtual void AddToSent(MediumMessage msg)
}

protected virtual void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}

protected virtual async Task FlushAsync()
{
while (!_bufferList.IsEmpty)
{
#pragma warning disable CA2012 // Use ValueTasks correctly
if (_bufferList.TryDequeue(out var message))
{
var isDelayMessage = message.Origin.Headers.ContainsKey(Headers.DelayTime);
if (isDelayMessage)
{

_dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!, CultureInfo.InvariantCulture)).ConfigureAwait(false);
await _dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!, CultureInfo.InvariantCulture)).ConfigureAwait(false);

}
else
{
_dispatcher.EnqueueToPublish(message).ConfigureAwait(false);
await _dispatcher.EnqueueToPublish(message).ConfigureAwait(false);
}
#pragma warning restore CA2012 // Use ValueTasks correctly
}
}
}
Expand Down

0 comments on commit 8575109

Please sign in to comment.