Skip to content

Improve exception handling for IReadModelPopulator #1087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### New in 1.2.1 (working version, not released yet)

* Fix: Prevent multiple calls of the same async subscribers when dispatching events (by @alexeyfv)
* Fix: Better exception handling and propagation in `ReadModelPopulator`

### New in 1.2.0 (released 2025-03-09)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2025 Rasmus Mikkelsen
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using EventFlow.Aggregates;
using EventFlow.Core;
using EventFlow.EventStores;
using EventFlow.Extensions;
using EventFlow.ReadStores;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;

namespace EventFlow.Tests.Exploration
{
// Related https://github.com/eventflow/EventFlow/issues/1083
public class ReadModelRepopulateExplorationTest
{
private IServiceProvider _serviceProvider;

[SetUp]
public void SetUp()
{
_serviceProvider = EventFlowOptions.New()
.AddEvents(new[] { typeof(EventV1), typeof(EventV2) })
.AddEventUpgraders(typeof(BrokenUpgradeV1ToV2))
.UseInMemoryReadStoreFor<UpgradeReadModel>()
.ServiceCollection.BuildServiceProvider();
}

[TearDown]
public void TearDown()
{
(_serviceProvider as IDisposable)?.Dispose();
}

[Test]
public async Task ActuallyStops()
{
// Arrange
var id = BrokenId.New;
var aggregateStore = _serviceProvider.GetRequiredService<IAggregateStore>();
var readModelPopulator = _serviceProvider.GetRequiredService<IReadModelPopulator>();
await aggregateStore.UpdateAsync<BrokenAggregate, BrokenId>(
id,
SourceId.New,
(a, c) =>
{
a.EmitUpgradeEventV1();
return Task.CompletedTask;
},
CancellationToken.None);

// Act and Assert
using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));
Assert.ThrowsAsync<Exception>(() => readModelPopulator.PopulateAsync(typeof(UpgradeReadModel), timeoutSource.Token));
}

public class UpgradeReadModel : IReadModel,
IAmReadModelFor<BrokenAggregate, BrokenId, EventV1>,
IAmReadModelFor<BrokenAggregate, BrokenId, EventV2>
{
public Task ApplyAsync(
IReadModelContext context,
IDomainEvent<BrokenAggregate, BrokenId, EventV1> domainEvent,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task ApplyAsync(
IReadModelContext context,
IDomainEvent<BrokenAggregate, BrokenId, EventV2> domainEvent,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}

public class BrokenId : Identity<BrokenId>
{
public BrokenId(string value) : base(value) { }
}

public class BrokenAggregate : AggregateRoot<BrokenAggregate, BrokenId>,
IEmit<EventV1>,
IEmit<EventV2>
{
public BrokenAggregate(BrokenId id) : base(id) { }

public bool V1Applied { get; private set; }
public bool V2Applied { get; private set; }

public void EmitUpgradeEventV1()
{
Emit(new EventV1());
}

public void Apply(EventV1 aggregateEvent)
{
V1Applied = true;
}

public void Apply(EventV2 aggregateEvent)
{
V2Applied = true;
}
}

public class EventV1 : IAggregateEvent<BrokenAggregate, BrokenId>
{
}

public class EventV2 : IAggregateEvent<BrokenAggregate, BrokenId>
{
}

public class BrokenUpgradeV1ToV2 : EventUpgraderNonAsync<BrokenAggregate, BrokenId>
{
protected override IEnumerable<IDomainEvent<BrokenAggregate, BrokenId>> Upgrade(
IDomainEvent<BrokenAggregate, BrokenId> domainEvent)
{
throw new Exception("Always broken!");
}
}
}
}
6 changes: 4 additions & 2 deletions Source/EventFlow/EventStores/EventUpgrader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Core;

Expand All @@ -43,6 +42,9 @@ public override async IAsyncEnumerable<IDomainEvent<TAggregate, TIdentity>> Upgr
IEventUpgradeContext eventUpgradeContext,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// We check as it now before calling as it is not passed to the legacy method
cancellationToken.ThrowIfCancellationRequested();

foreach (var upgradedDomainEvent in Upgrade(domainEvent))
{
yield return upgradedDomainEvent;
Expand All @@ -60,7 +62,7 @@ public virtual async IAsyncEnumerable<IDomainEvent> UpgradeAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var castDomainEvent = (IDomainEvent<TAggregate, TIdentity>) domainEvent;
await foreach (var e in UpgradeAsync(castDomainEvent, eventUpgradeContext, cancellationToken).WithCancellation(cancellationToken))
await foreach (var e in UpgradeAsync(castDomainEvent, eventUpgradeContext, cancellationToken))
{
yield return e;
}
Expand Down
37 changes: 21 additions & 16 deletions Source/EventFlow/ReadStores/ReadModelPopulator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
Expand All @@ -48,7 +47,7 @@ public class ReadModelPopulator : IReadModelPopulator
private readonly IServiceProvider _serviceProvider;
private readonly IEventUpgradeContextFactory _eventUpgradeContextFactory;
private readonly IMemoryCache _memoryCache;
private ConcurrentQueue<AllEventsPage> _pipedEvents = new ConcurrentQueue<AllEventsPage>();
private readonly ConcurrentQueue<AllEventsPage> _pipedEvents = new ConcurrentQueue<AllEventsPage>();

public ReadModelPopulator(
ILogger<ReadModelPopulator> logger,
Expand Down Expand Up @@ -118,14 +117,15 @@ public async Task PopulateAsync(IReadOnlyCollection<Type> readModelTypes, Cancel
var combinedReadModelTypeString = string.Join(", ", readModelTypes.Select(type => type.PrettyPrint()));
_logger.LogInformation("Starting populating of {ReadModelTypes}", combinedReadModelTypeString);

var loadEventsTasks = LoadEvents(cancellationToken);
var processEventQueueTask = ProcessEventQueue(readModelTypes, cancellationToken);
var loadEventsTasks = LoadEventsAsync(cancellationToken);
var processEventQueueTask = ProcessEventQueueAsync(readModelTypes, cancellationToken);

await Task.WhenAll(loadEventsTasks, processEventQueueTask);

_logger.LogInformation("Population of readmodels completed");
_logger.LogInformation("Population of read models completed");
}

private async Task LoadEvents(CancellationToken cancellationToken)
private async Task LoadEventsAsync(CancellationToken cancellationToken)
{
long totalEvents = 0;
var currentPosition = GlobalPosition.Start;
Expand Down Expand Up @@ -162,22 +162,23 @@ private async Task LoadEvents(CancellationToken cancellationToken)
}
}

private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, CancellationToken cancellationToken)
private async Task ProcessEventQueueAsync(
IReadOnlyCollection<Type> readModelTypes,
CancellationToken cancellationToken)
{
var domainEventsToProcess = new List<IDomainEvent>();
AllEventsPage fetchedEvents;

var hasMoreEvents = true;
do
{
var noEventsToReady = !_pipedEvents.Any();
if (noEventsToReady)
{
await Task.Delay(100);
await Task.Delay(100, cancellationToken);
continue;
}

_pipedEvents.TryDequeue(out fetchedEvents);
_pipedEvents.TryDequeue(out var fetchedEvents);
if (fetchedEvents == null)
{
continue;
Expand All @@ -190,7 +191,7 @@ private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, C
var processEvents = !hasMoreEvents || batchExceedsThreshold;
if (processEvents)
{
var readModelUpdateTasks = readModelTypes.Select(readModelType => ProcessEvents(readModelType, domainEventsToProcess, cancellationToken));
var readModelUpdateTasks = readModelTypes.Select(readModelType => ProcessEventsAsync(readModelType, domainEventsToProcess, cancellationToken));
await Task.WhenAll(readModelUpdateTasks);

domainEventsToProcess.Clear();
Expand All @@ -199,7 +200,10 @@ private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, C
while (hasMoreEvents);
}

private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomainEvent> processEvents, CancellationToken cancellationToken)
private async Task ProcessEventsAsync(
Type readModelType,
IReadOnlyCollection<IDomainEvent> processEvents,
CancellationToken cancellationToken)
{
try
{
Expand All @@ -209,11 +213,12 @@ private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomain

var readModelTypes = new[]
{
typeof( IAmReadModelFor<,,> )
typeof(IAmReadModelFor<,,> )
};

var aggregateEventTypes = _memoryCache.GetOrCreate(CacheKey.With(GetType(), readModelType.ToString(), nameof(ProcessEvents)),
e => new HashSet<Type>(readModelType.GetTypeInfo()
var aggregateEventTypes = _memoryCache.GetOrCreate(
CacheKey.With(GetType(), readModelType.ToString(), nameof(ProcessEventsAsync)),
_ => new HashSet<Type>(readModelType.GetTypeInfo()
.GetInterfaces()
.Where(i => i.GetTypeInfo().IsGenericType && readModelTypes.Contains(i.GetGenericTypeDefinition()))
.Select(i => i.GetTypeInfo().GetGenericArguments()[2])));
Expand Down Expand Up @@ -249,7 +254,7 @@ private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomain
}
catch (Exception e)
{
_logger.LogWarning($"Exception when populating: {readModelType}. Details: {e}");
_logger.LogError(e, $"Exception when populating: {readModelType}");
}
}

Expand Down