Skip to content

Commit

Permalink
Fix and test mongo outbox implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken committed Apr 4, 2024
1 parent a41e35e commit 6d38988
Show file tree
Hide file tree
Showing 20 changed files with 181 additions and 83 deletions.
9 changes: 8 additions & 1 deletion MinimalDomainEvents.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox.
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox.Worker.Abstractions", "src\MinimalDomainEvents.Outbox.Worker.Abstractions\MinimalDomainEvents.Outbox.Worker.Abstractions.csproj", "{03BBC09D-9E45-487F-AB2A-C0ADA46B58A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalDomainEvents.Outbox.Worker.UnitTests", "test\MinimalDomainEvents.Outbox.Worker.UnitTests\MinimalDomainEvents.Outbox.Worker.UnitTests.csproj", "{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox.Worker.UnitTests", "test\MinimalDomainEvents.Outbox.Worker.UnitTests\MinimalDomainEvents.Outbox.Worker.UnitTests.csproj", "{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoTestContainer", "test\MongoTestContainer\MongoTestContainer.csproj", "{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -131,6 +133,10 @@ Global
{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Release|Any CPU.Build.0 = Release|Any CPU
{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Debug|Any CPU.Build.0 = Debug|Any CPU
{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Release|Any CPU.ActiveCfg = Release|Any CPU
{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -156,6 +162,7 @@ Global
{2DEA751C-8CE8-4452-A365-68F1AAE76551} = {AB124D7A-09CE-4168-B706-F319426E6383}
{03BBC09D-9E45-487F-AB2A-C0ADA46B58A1} = {AB124D7A-09CE-4168-B706-F319426E6383}
{F3C36E6D-6A1A-4407-87A6-D77B24C0576D} = {A37BECD1-4461-4E64-BF71-B48ACEBB8447}
{99C8CE04-4BC6-4206-B86C-1CE649C7FC19} = {A37BECD1-4461-4E64-BF71-B48ACEBB8447}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5C04EBF0-7B5F-4CCE-8D3C-55C5B17C2BEB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ namespace MinimalDomainEvents.Dispatcher.MediatR;

public static class IDomainEventDispatcherBuilderExtensions
{
public static IServiceCollection AddMediatorDispatcher(this IDomainEventDispatcherBuilder builder)
public static IDomainEventDispatcherBuilder AddMediatorDispatcher(this IDomainEventDispatcherBuilder builder)
{
return builder.Services
builder.Services
.AddScoped<IDispatchDomainEvents, MediatorDispatcher>()
.AddTransient(typeof(IPipelineBehavior<,>), typeof(DomainEventDispatchBehavior<,>))
;

return builder;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MinimalDomainEvents.Outbox.Abstractions;
public interface IOutboxTransaction : IDisposable
{
Task StartTransaction(CancellationToken cancellationToken = default);
Task StartTransaction(Action? onCommit = null, CancellationToken cancellationToken = default);
Task Commit(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ private static void RegisterDefaultServices(IOutboxDispatcherBuilder builder)
builder.Services.TryAddScoped<MongoSessionProvider>();
builder.Services.TryAddScoped<IMongoSessionProvider>(sp => sp.GetRequiredService<MongoSessionProvider>());
builder.Services.TryAddScoped<IMongoSessionProviderInitializer>(sp => sp.GetRequiredService<MongoSessionProvider>());
builder.Services.TryAddScoped<IOutboxRecordCollectionProvider, OutboxRecordCollectionProvider>();
builder.Services.TryAddScoped<OutboxRecordCollectionProvider>();
builder.Services.TryAddScoped<IOutboxRecordCollectionProvider>(sp => sp.GetRequiredService<OutboxRecordCollectionProvider>());
builder.Services.TryAddScoped<IOutboxRecordCollectionInitializer>(sp => sp.GetRequiredService<OutboxRecordCollectionProvider>());
builder.Services.TryAddScoped<ITransactionProvider, MongoDbTransactionProvider>();
builder.Services.TryAddScoped<IRetrieveOutboxRecords, MongoDbOutboxRecordRetriever>();
builder.Services.TryAddScoped<ICleanupOutboxRecords, MongoDbOutboxRecordCleaner>();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ internal sealed class MongoDbOutboxTransaction : IOutboxTransaction
{
public IClientSessionHandle ClientSessionHandle { get; }

private Action? _onCommit;

public MongoDbOutboxTransaction(IClientSessionHandle clientSessionHandle)
{
ClientSessionHandle = clientSessionHandle;
}

public Task StartTransaction(CancellationToken cancellationToken = default)
public Task StartTransaction(Action? onCommit = null, CancellationToken cancellationToken = default)
{
ClientSessionHandle.StartTransaction();
_onCommit = onCommit;
return Task.CompletedTask;
}

public Task Commit(CancellationToken cancellationToken = default)
public async Task Commit(CancellationToken cancellationToken = default)
{
return ClientSessionHandle.CommitTransactionAsync(cancellationToken);
await ClientSessionHandle.CommitTransactionAsync(cancellationToken);
_onCommit?.Invoke();
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ public async Task<IOutboxTransaction> NewTransaction(CancellationToken cancellat

var session = await _mongoClient.StartSessionAsync(null, cancellationToken);
_currentTransaction = new MongoDbOutboxTransaction(session);
await _currentTransaction.StartTransaction(cancellationToken);
await _currentTransaction.StartTransaction(ClearTransaction, cancellationToken);
return _currentTransaction;
}

private void ClearTransaction()
{
_currentTransaction = null;
}

public bool TryGetCurrentTransaction(out IOutboxTransaction? outboxTransaction)
{
outboxTransaction = _currentTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal sealed class OutboxRecordCollectionProvider : IOutboxRecordCollectionPr
private readonly MongoClient _mongoClient;

private const string EnqueuedAtIndexName = "EnqueuedAt_asc";
private const string DispatchedAtIndexName = "ExpiresAt_asc";
private const string DispatchedAtIndexName = "OutboxCleanup";

public OutboxRecordCollectionProvider(OutboxSettings outboxSettings, MongoClient mongoClient)
{
Expand Down Expand Up @@ -77,7 +77,7 @@ private static async Task CreateEnqueuedAtIndex(IMongoCollection<OutboxRecord> c

private static async Task CreateDispatchedAtIndex(IMongoCollection<OutboxRecord> collection, CancellationToken cancellationToken)
{
var indexKeysDefinition = Builders<OutboxRecord>.IndexKeys.Ascending(or => or.ExpiresAt);
var indexKeysDefinition = Builders<OutboxRecord>.IndexKeys.Ascending(or => or.DispatchedAt);
var createIndexModel = new CreateIndexModel<OutboxRecord>(indexKeysDefinition, new()
{
ExpireAfter = TimeSpan.FromDays(7),
Expand All @@ -92,6 +92,7 @@ private static bool TryRegisterOutboxRecordClassMap()
return BsonClassMap.TryRegisterClassMap<OutboxRecord>(cm =>
{
cm.MapIdProperty(or => or.Id);
cm.AutoMap();
cm.UnmapProperty(or => or.ExpiresAt);
cm.SetIgnoreExtraElements(true);
});
Expand Down

This file was deleted.

35 changes: 19 additions & 16 deletions src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Microsoft.Extensions.Hosting;
using MinimalDomainEvents.Dispatcher.Abstractions;
using MinimalDomainEvents.Outbox.Abstractions;
using MinimalDomainEvents.Outbox.Worker.Abstractions;
using System.Diagnostics;

namespace MinimalDomainEvents.Outbox.Worker;
internal sealed class BackgroundDispatchWorker : BackgroundService
Expand All @@ -11,41 +11,44 @@ internal sealed class BackgroundDispatchWorker : BackgroundService
private readonly IOutboxRecordCollectionInitializer _outboxRecordCollectionInitializer;
private readonly ITransactionProvider _transactionFactory;
private readonly IDomainEventRetriever _domainEventRetriever;
private readonly ICleanupOutboxRecords _cleanupOutboxRecords;
private readonly IEnumerable<IDispatchDomainEvents> _dispatchers;

public BackgroundDispatchWorker(IOutboxRecordCollectionInitializer outboxRecordCollectionInitializer,
ITransactionProvider transactionFactory,
IDomainEventRetriever domainEventRetriever,
ICleanupOutboxRecords cleanupOutboxRecords,
IEnumerable<IDispatchDomainEvents> dispatchers)
{
_domainEventRetriever = domainEventRetriever;
_outboxRecordCollectionInitializer = outboxRecordCollectionInitializer;
_transactionFactory = transactionFactory;
_cleanupOutboxRecords = cleanupOutboxRecords;
_dispatchers = dispatchers;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _outboxRecordCollectionInitializer.Initialize(stoppingToken);

while (!stoppingToken.IsCancellationRequested)
try
{
using var transaction = await _transactionFactory.NewTransaction(stoppingToken);
var domainEvents = await _domainEventRetriever.GetAndMarkAsDispatched(stoppingToken);
await _outboxRecordCollectionInitializer.Initialize(stoppingToken);

if (domainEvents is not null && domainEvents.Count > 0)
while (!stoppingToken.IsCancellationRequested)
{
foreach (var dispatcher in _dispatchers)
await dispatcher.Dispatch(domainEvents);
}
using var transaction = await _transactionFactory.NewTransaction(stoppingToken);
var domainEvents = await _domainEventRetriever.GetAndMarkAsDispatched(stoppingToken);

if (domainEvents is not null && domainEvents.Count > 0)
{
foreach (var dispatcher in _dispatchers)
await dispatcher.Dispatch(domainEvents);
}

await _cleanupOutboxRecords.CleanupExpiredOutboxRecords(stoppingToken);
await transaction.Commit(stoppingToken);
await transaction.Commit(stoppingToken);

await Task.Delay(Delay, stoppingToken);
await Task.Delay(Delay, stoppingToken);
}
}
catch (TaskCanceledException)
{
Debug.WriteLine("BackgroundDispatchWorker stopped. Reason: TaskCanceledException.");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using MinimalDomainEvents.Outbox.Abstractions;

namespace MinimalDomainEvents.Outbox.Worker;
Expand All @@ -8,6 +9,7 @@ public static class IOutboxDispatcherBuilderExtensions
public static IOutboxDispatcherBuilder WithHostingDispatcher(this IOutboxDispatcherBuilder builder)
{
builder.Services.TryAddScoped<IDomainEventRetriever, DomainEventRetriever>();
builder.Services.AddHostedService<BackgroundDispatchWorker>();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

<ItemGroup>
<InternalsVisibleTo Include="MinimalDomainEvents.Outbox.UnitTests" />
<InternalsVisibleTo Include="MinimalDomainEvents.Outbox.Worker.UnitTests" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="Testcontainers.MongoDb" Version="3.7.0" />
<PackageReference Include="xunit" Version="2.6.5" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand All @@ -30,6 +29,7 @@
<ProjectReference Include="..\..\src\MinimalDomainEvents.Dispatcher\MinimalDomainEvents.Dispatcher.csproj" />
<ProjectReference Include="..\..\src\MinimalDomainEvents.Outbox.MongoDb\MinimalDomainEvents.Outbox.MongoDb.csproj" />
<ProjectReference Include="..\..\src\MinimalDomainEvents.Outbox\MinimalDomainEvents.Outbox.csproj" />
<ProjectReference Include="..\MongoTestContainer\MongoTestContainer.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using MongoTestContainer;

namespace MinimalDomainEvents.Outbox.MongoDb.UnitTests;
[CollectionDefinition("MongoDb Integration")]
public partial class MongoContainerCollectionDefinition : ICollectionFixture<MongoContainerFixture> { }
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using MessagePack;
using Microsoft.Extensions.DependencyInjection;
using MinimalDomainEvents.Contract;
using MinimalDomainEvents.Core;
using MinimalDomainEvents.Dispatcher;
using MinimalDomainEvents.Dispatcher.Abstractions;
using MinimalDomainEvents.Outbox.Abstractions;
using MinimalDomainEvents.Outbox.Worker.Abstractions;
using MongoDB.Driver;
using MongoTestContainer;

namespace MinimalDomainEvents.Outbox.MongoDb.UnitTests;
[Collection("MongoDb Integration")]
Expand Down Expand Up @@ -47,7 +47,6 @@ internal async Task Using_Transactions_Persists_OutboxRecords_Within_Same_Transa

var transactionProvider = serviceProvider.GetRequiredService<ITransactionProvider>();
var retriever = serviceProvider.GetRequiredService<IRetrieveOutboxRecords>();
var cleaner = serviceProvider.GetRequiredService<ICleanupOutboxRecords>();
using var transaction = await transactionProvider.NewTransaction();

var outboxRecords = await retriever.GetAndMarkAsDispatched();
Expand Down
Loading

0 comments on commit 6d38988

Please sign in to comment.