Skip to content

Commit

Permalink
Clear event predicates after GetEventsRecords method.
Browse files Browse the repository at this point in the history
Resolve #22054
  • Loading branch information
maliming committed Feb 1, 2025
1 parent 69fde71 commit 5aaf555
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
8 changes: 5 additions & 3 deletions framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,16 @@ public virtual async Task CompleteAsync(CancellationToken cancellationToken = de
_isCompleting = true;
await SaveChangesAsync(cancellationToken);

DistributedEvents.AddRange(GetEventsRecords(DistributedEventWithPredicates));
LocalEvents.AddRange(GetEventsRecords(LocalEventWithPredicates));
LocalEventWithPredicates.Clear();
DistributedEvents.AddRange(GetEventsRecords(DistributedEventWithPredicates));
DistributedEventWithPredicates.Clear();

while (LocalEvents.Any() || DistributedEvents.Any())
{
if (LocalEvents.Any())
{
var localEventsToBePublished = LocalEvents.OrderBy(e => e.EventOrder).ToArray();
LocalEventWithPredicates.Clear();
LocalEvents.Clear();
await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
localEventsToBePublished
Expand All @@ -157,7 +158,6 @@ await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
if (DistributedEvents.Any())
{
var distributedEventsToBePublished = DistributedEvents.OrderBy(e => e.EventOrder).ToArray();
DistributedEventWithPredicates.Clear();
DistributedEvents.Clear();
await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
distributedEventsToBePublished
Expand All @@ -167,7 +167,9 @@ await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
await SaveChangesAsync(cancellationToken);

LocalEvents.AddRange(GetEventsRecords(LocalEventWithPredicates));
LocalEventWithPredicates.Clear();
DistributedEvents.AddRange(GetEventsRecords(DistributedEventWithPredicates));
DistributedEventWithPredicates.Clear();
}

await CommitTransactionsAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ public abstract class DomainEvents_Tests<TStartupModule> : TestAppTestBase<TStar
protected readonly IRepository<AppEntityWithNavigations, Guid> AppEntityWithNavigationsRepository;
protected readonly ILocalEventBus LocalEventBus;
protected readonly IDistributedEventBus DistributedEventBus;
protected readonly IUnitOfWorkManager UnitOfWorkManager;

protected DomainEvents_Tests()
{
PersonRepository = GetRequiredService<IRepository<Person, Guid>>();
AppEntityWithNavigationsRepository = GetRequiredService<IRepository<AppEntityWithNavigations, Guid>>();
LocalEventBus = GetRequiredService<ILocalEventBus>();
DistributedEventBus = GetRequiredService<IDistributedEventBus>();
UnitOfWorkManager = GetRequiredService<IUnitOfWorkManager>();
}

[Fact]
Expand Down Expand Up @@ -176,6 +178,52 @@ await WithUnitOfWorkAsync(async () =>
isDistributedEventTriggered.ShouldBeTrue();
}

[Fact]
public async Task Should_AddOrReplace_Event_Records_In_Uow_Test()
{
//Arrange
var event1Triggered = false;
var event2Triggered = false;
var event3Triggered = false;
var event4Triggered = false;

LocalEventBus.Subscribe<MyCustomEventData>(async data =>
{
event1Triggered = true;
UnitOfWorkManager.Current!.AddOrReplaceDistributedEvent(new UnitOfWorkEventRecord(typeof(MyCustomEventData3), new MyCustomEventData3 { Value = "42" }, 2));
});

DistributedEventBus.Subscribe<MyCustomEventData2>(async data =>
{
event2Triggered = true;
UnitOfWorkManager.Current!.AddOrReplaceLocalEvent(new UnitOfWorkEventRecord(typeof(MyCustomEventData4), new MyCustomEventData4 { Value = "42" }, 2));
});

LocalEventBus.Subscribe<MyCustomEventData3>(async data =>
{
event3Triggered = true;
});

DistributedEventBus.Subscribe<MyCustomEventData4>(async data =>
{
event4Triggered = true;
});

//Act
using (var uow = UnitOfWorkManager.Begin(requiresNew: true))
{
UnitOfWorkManager.Current!.AddOrReplaceLocalEvent(new UnitOfWorkEventRecord(typeof(MyCustomEventData), new MyCustomEventData { Value = "42" }, 1));
UnitOfWorkManager.Current!.AddOrReplaceDistributedEvent(new UnitOfWorkEventRecord(typeof(MyCustomEventData2), new MyCustomEventData2 { Value = "42" }, 1));
await uow.CompleteAsync();
}

//Assert
event1Triggered.ShouldBeTrue();
event2Triggered.ShouldBeTrue();
event3Triggered.ShouldBeTrue();
event4Triggered.ShouldBeTrue();
}

private class MyCustomEventData
{
public string Value { get; set; }
Expand All @@ -185,6 +233,16 @@ private class MyCustomEventData2
{
public string Value { get; set; }
}

private class MyCustomEventData3
{
public string Value { get; set; }
}

private class MyCustomEventData4
{
public string Value { get; set; }
}
}

public abstract class AbpEntityChangeOptions_DomainEvents_Tests<TStartupModule> : TestAppTestBase<TStartupModule>
Expand Down

0 comments on commit 5aaf555

Please sign in to comment.