From 6d389886d76ca659a80cfd7eff68ad58a10ed7ef Mon Sep 17 00:00:00 2001 From: Ken Date: Thu, 4 Apr 2024 21:26:45 +0200 Subject: [PATCH] Fix and test mongo outbox implementation. --- MinimalDomainEvents.sln | 9 +- ...IDomainEventDispatcherBuilderExtensions.cs | 6 +- .../IOutboxTransaction.cs | 2 +- .../IOutboxDispatcherBuilderExtensions.cs | 5 +- .../MongoDbOutboxRecordCleaner.cs | 30 ------ .../MongoDbOutboxTransaction.cs | 10 +- .../MongoDbTransactionProvider.cs | 7 +- .../OutboxRecordCollectionProvider.cs | 5 +- .../ICleanupOutboxRecords.cs | 5 - .../BackgroundDispatchWorker.cs | 35 +++---- .../IOutboxDispatcherBuilderExtensions.cs | 4 +- .../MinimalDomainEvents.Outbox.csproj | 1 + ...mainEvents.Outbox.MongoDb.UnitTests.csproj | 2 +- .../MongoContainerCollectionDefinition.cs | 5 + .../MongoWorkerTests.cs | 3 +- .../BackgroundDispatchWorkerTests.cs | 95 +++++++++++++++++-- ...omainEvents.Outbox.Worker.UnitTests.csproj | 15 ++- .../MongoContainerCollectionDefinition.cs | 5 + .../MongoContainerFixture.cs | 6 +- .../MongoTestContainer.csproj | 14 +++ 20 files changed, 181 insertions(+), 83 deletions(-) delete mode 100644 src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxRecordCleaner.cs delete mode 100644 src/MinimalDomainEvents.Outbox.Worker.Abstractions/ICleanupOutboxRecords.cs create mode 100644 test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerCollectionDefinition.cs create mode 100644 test/MinimalDomainEvents.Outbox.Worker.UnitTests/MongoContainerCollectionDefinition.cs rename test/{MinimalDomainEvents.Outbox.MongoDb.UnitTests => MongoTestContainer}/MongoContainerFixture.cs (80%) create mode 100644 test/MongoTestContainer/MongoTestContainer.csproj diff --git a/MinimalDomainEvents.sln b/MinimalDomainEvents.sln index c1bc3c0..18813ee 100644 --- a/MinimalDomainEvents.sln +++ b/MinimalDomainEvents.sln @@ -55,7 +55,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox.Worker.Abstractions", "src\MinimalDomainEvents.Outbox.Worker.Abstractions\MinimalDomainEvents.Outbox.Worker.Abstractions.csproj", "{03BBC09D-9E45-487F-AB2A-C0ADA46B58A1}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MinimalDomainEvents.Outbox.Worker.UnitTests", "test\MinimalDomainEvents.Outbox.Worker.UnitTests\MinimalDomainEvents.Outbox.Worker.UnitTests.csproj", "{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimalDomainEvents.Outbox.Worker.UnitTests", "test\MinimalDomainEvents.Outbox.Worker.UnitTests\MinimalDomainEvents.Outbox.Worker.UnitTests.csproj", "{F3C36E6D-6A1A-4407-87A6-D77B24C0576D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoTestContainer", "test\MongoTestContainer\MongoTestContainer.csproj", "{99C8CE04-4BC6-4206-B86C-1CE649C7FC19}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -131,6 +133,10 @@ Global {F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Debug|Any CPU.Build.0 = Debug|Any CPU {F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Release|Any CPU.ActiveCfg = Release|Any CPU {F3C36E6D-6A1A-4407-87A6-D77B24C0576D}.Release|Any CPU.Build.0 = Release|Any CPU + {99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Debug|Any CPU.Build.0 = Debug|Any CPU + {99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Release|Any CPU.ActiveCfg = Release|Any CPU + {99C8CE04-4BC6-4206-B86C-1CE649C7FC19}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -156,6 +162,7 @@ Global {2DEA751C-8CE8-4452-A365-68F1AAE76551} = {AB124D7A-09CE-4168-B706-F319426E6383} {03BBC09D-9E45-487F-AB2A-C0ADA46B58A1} = {AB124D7A-09CE-4168-B706-F319426E6383} {F3C36E6D-6A1A-4407-87A6-D77B24C0576D} = {A37BECD1-4461-4E64-BF71-B48ACEBB8447} + {99C8CE04-4BC6-4206-B86C-1CE649C7FC19} = {A37BECD1-4461-4E64-BF71-B48ACEBB8447} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {5C04EBF0-7B5F-4CCE-8D3C-55C5B17C2BEB} diff --git a/src/MinimalDomainEvents.Dispatcher.MediatR/IDomainEventDispatcherBuilderExtensions.cs b/src/MinimalDomainEvents.Dispatcher.MediatR/IDomainEventDispatcherBuilderExtensions.cs index a1f9e1a..a8aeb9c 100644 --- a/src/MinimalDomainEvents.Dispatcher.MediatR/IDomainEventDispatcherBuilderExtensions.cs +++ b/src/MinimalDomainEvents.Dispatcher.MediatR/IDomainEventDispatcherBuilderExtensions.cs @@ -6,11 +6,13 @@ namespace MinimalDomainEvents.Dispatcher.MediatR; public static class IDomainEventDispatcherBuilderExtensions { - public static IServiceCollection AddMediatorDispatcher(this IDomainEventDispatcherBuilder builder) + public static IDomainEventDispatcherBuilder AddMediatorDispatcher(this IDomainEventDispatcherBuilder builder) { - return builder.Services + builder.Services .AddScoped() .AddTransient(typeof(IPipelineBehavior<,>), typeof(DomainEventDispatchBehavior<,>)) ; + + return builder; } } diff --git a/src/MinimalDomainEvents.Outbox.Abstractions/IOutboxTransaction.cs b/src/MinimalDomainEvents.Outbox.Abstractions/IOutboxTransaction.cs index 9fecc2f..5058eb7 100644 --- a/src/MinimalDomainEvents.Outbox.Abstractions/IOutboxTransaction.cs +++ b/src/MinimalDomainEvents.Outbox.Abstractions/IOutboxTransaction.cs @@ -1,6 +1,6 @@ namespace MinimalDomainEvents.Outbox.Abstractions; public interface IOutboxTransaction : IDisposable { - Task StartTransaction(CancellationToken cancellationToken = default); + Task StartTransaction(Action? onCommit = null, CancellationToken cancellationToken = default); Task Commit(CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/MinimalDomainEvents.Outbox.MongoDb/IOutboxDispatcherBuilderExtensions.cs b/src/MinimalDomainEvents.Outbox.MongoDb/IOutboxDispatcherBuilderExtensions.cs index 8d652c2..69a259a 100644 --- a/src/MinimalDomainEvents.Outbox.MongoDb/IOutboxDispatcherBuilderExtensions.cs +++ b/src/MinimalDomainEvents.Outbox.MongoDb/IOutboxDispatcherBuilderExtensions.cs @@ -29,9 +29,10 @@ private static void RegisterDefaultServices(IOutboxDispatcherBuilder builder) builder.Services.TryAddScoped(); builder.Services.TryAddScoped(sp => sp.GetRequiredService()); builder.Services.TryAddScoped(sp => sp.GetRequiredService()); - builder.Services.TryAddScoped(); + builder.Services.TryAddScoped(); + builder.Services.TryAddScoped(sp => sp.GetRequiredService()); + builder.Services.TryAddScoped(sp => sp.GetRequiredService()); builder.Services.TryAddScoped(); builder.Services.TryAddScoped(); - builder.Services.TryAddScoped(); } } diff --git a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxRecordCleaner.cs b/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxRecordCleaner.cs deleted file mode 100644 index 9a8c9a8..0000000 --- a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxRecordCleaner.cs +++ /dev/null @@ -1,30 +0,0 @@ -using MinimalDomainEvents.Outbox.Abstractions; -using MinimalDomainEvents.Outbox.Worker.Abstractions; -using MongoDB.Driver; - -namespace MinimalDomainEvents.Outbox.MongoDb; -internal sealed class MongoDbOutboxRecordCleaner : ICleanupOutboxRecords -{ - private readonly IOutboxRecordCollectionProvider _outboxRecordCollectionProvider; - private readonly IMongoSessionProvider _sessionProvider; - - public MongoDbOutboxRecordCleaner(IOutboxRecordCollectionProvider outboxRecordCollectionProvider, IMongoSessionProvider sessionProvider) - { - _outboxRecordCollectionProvider = outboxRecordCollectionProvider; - _sessionProvider = sessionProvider; - } - - public async Task CleanupExpiredOutboxRecords(CancellationToken cancellationToken = default) - { - var collection = _outboxRecordCollectionProvider.Provide(); - - if (_sessionProvider.Session is not null) - { - await collection.DeleteManyAsync(_sessionProvider.Session!, Builders.Filter.Lte(or => or.ExpiresAt, DateTimeOffset.UtcNow), null, cancellationToken); - } - else - { - await collection.DeleteManyAsync(Builders.Filter.Lte(or => or.ExpiresAt, DateTimeOffset.UtcNow), null, cancellationToken); - } - } -} diff --git a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxTransaction.cs b/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxTransaction.cs index 1543862..8b092cf 100644 --- a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxTransaction.cs +++ b/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbOutboxTransaction.cs @@ -7,20 +7,24 @@ internal sealed class MongoDbOutboxTransaction : IOutboxTransaction { public IClientSessionHandle ClientSessionHandle { get; } + private Action? _onCommit; + public MongoDbOutboxTransaction(IClientSessionHandle clientSessionHandle) { ClientSessionHandle = clientSessionHandle; } - public Task StartTransaction(CancellationToken cancellationToken = default) + public Task StartTransaction(Action? onCommit = null, CancellationToken cancellationToken = default) { ClientSessionHandle.StartTransaction(); + _onCommit = onCommit; return Task.CompletedTask; } - public Task Commit(CancellationToken cancellationToken = default) + public async Task Commit(CancellationToken cancellationToken = default) { - return ClientSessionHandle.CommitTransactionAsync(cancellationToken); + await ClientSessionHandle.CommitTransactionAsync(cancellationToken); + _onCommit?.Invoke(); } public void Dispose() diff --git a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbTransactionProvider.cs b/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbTransactionProvider.cs index 984cf25..cb8dac8 100644 --- a/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbTransactionProvider.cs +++ b/src/MinimalDomainEvents.Outbox.MongoDb/MongoDbTransactionProvider.cs @@ -21,10 +21,15 @@ public async Task NewTransaction(CancellationToken cancellat var session = await _mongoClient.StartSessionAsync(null, cancellationToken); _currentTransaction = new MongoDbOutboxTransaction(session); - await _currentTransaction.StartTransaction(cancellationToken); + await _currentTransaction.StartTransaction(ClearTransaction, cancellationToken); return _currentTransaction; } + private void ClearTransaction() + { + _currentTransaction = null; + } + public bool TryGetCurrentTransaction(out IOutboxTransaction? outboxTransaction) { outboxTransaction = _currentTransaction; diff --git a/src/MinimalDomainEvents.Outbox.MongoDb/OutboxRecordCollectionProvider.cs b/src/MinimalDomainEvents.Outbox.MongoDb/OutboxRecordCollectionProvider.cs index edcd11b..d6056d4 100644 --- a/src/MinimalDomainEvents.Outbox.MongoDb/OutboxRecordCollectionProvider.cs +++ b/src/MinimalDomainEvents.Outbox.MongoDb/OutboxRecordCollectionProvider.cs @@ -17,7 +17,7 @@ internal sealed class OutboxRecordCollectionProvider : IOutboxRecordCollectionPr private readonly MongoClient _mongoClient; private const string EnqueuedAtIndexName = "EnqueuedAt_asc"; - private const string DispatchedAtIndexName = "ExpiresAt_asc"; + private const string DispatchedAtIndexName = "OutboxCleanup"; public OutboxRecordCollectionProvider(OutboxSettings outboxSettings, MongoClient mongoClient) { @@ -77,7 +77,7 @@ private static async Task CreateEnqueuedAtIndex(IMongoCollection c private static async Task CreateDispatchedAtIndex(IMongoCollection collection, CancellationToken cancellationToken) { - var indexKeysDefinition = Builders.IndexKeys.Ascending(or => or.ExpiresAt); + var indexKeysDefinition = Builders.IndexKeys.Ascending(or => or.DispatchedAt); var createIndexModel = new CreateIndexModel(indexKeysDefinition, new() { ExpireAfter = TimeSpan.FromDays(7), @@ -92,6 +92,7 @@ private static bool TryRegisterOutboxRecordClassMap() return BsonClassMap.TryRegisterClassMap(cm => { cm.MapIdProperty(or => or.Id); + cm.AutoMap(); cm.UnmapProperty(or => or.ExpiresAt); cm.SetIgnoreExtraElements(true); }); diff --git a/src/MinimalDomainEvents.Outbox.Worker.Abstractions/ICleanupOutboxRecords.cs b/src/MinimalDomainEvents.Outbox.Worker.Abstractions/ICleanupOutboxRecords.cs deleted file mode 100644 index 8eb21b5..0000000 --- a/src/MinimalDomainEvents.Outbox.Worker.Abstractions/ICleanupOutboxRecords.cs +++ /dev/null @@ -1,5 +0,0 @@ -namespace MinimalDomainEvents.Outbox.Worker.Abstractions; -public interface ICleanupOutboxRecords -{ - Task CleanupExpiredOutboxRecords(CancellationToken cancellationToken = default); -} \ No newline at end of file diff --git a/src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs b/src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs index 613be02..509e320 100644 --- a/src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs +++ b/src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs @@ -1,7 +1,7 @@ using Microsoft.Extensions.Hosting; using MinimalDomainEvents.Dispatcher.Abstractions; using MinimalDomainEvents.Outbox.Abstractions; -using MinimalDomainEvents.Outbox.Worker.Abstractions; +using System.Diagnostics; namespace MinimalDomainEvents.Outbox.Worker; internal sealed class BackgroundDispatchWorker : BackgroundService @@ -11,41 +11,44 @@ internal sealed class BackgroundDispatchWorker : BackgroundService private readonly IOutboxRecordCollectionInitializer _outboxRecordCollectionInitializer; private readonly ITransactionProvider _transactionFactory; private readonly IDomainEventRetriever _domainEventRetriever; - private readonly ICleanupOutboxRecords _cleanupOutboxRecords; private readonly IEnumerable _dispatchers; public BackgroundDispatchWorker(IOutboxRecordCollectionInitializer outboxRecordCollectionInitializer, ITransactionProvider transactionFactory, IDomainEventRetriever domainEventRetriever, - ICleanupOutboxRecords cleanupOutboxRecords, IEnumerable dispatchers) { _domainEventRetriever = domainEventRetriever; _outboxRecordCollectionInitializer = outboxRecordCollectionInitializer; _transactionFactory = transactionFactory; - _cleanupOutboxRecords = cleanupOutboxRecords; _dispatchers = dispatchers; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await _outboxRecordCollectionInitializer.Initialize(stoppingToken); - - while (!stoppingToken.IsCancellationRequested) + try { - using var transaction = await _transactionFactory.NewTransaction(stoppingToken); - var domainEvents = await _domainEventRetriever.GetAndMarkAsDispatched(stoppingToken); + await _outboxRecordCollectionInitializer.Initialize(stoppingToken); - if (domainEvents is not null && domainEvents.Count > 0) + while (!stoppingToken.IsCancellationRequested) { - foreach (var dispatcher in _dispatchers) - await dispatcher.Dispatch(domainEvents); - } + using var transaction = await _transactionFactory.NewTransaction(stoppingToken); + var domainEvents = await _domainEventRetriever.GetAndMarkAsDispatched(stoppingToken); + + if (domainEvents is not null && domainEvents.Count > 0) + { + foreach (var dispatcher in _dispatchers) + await dispatcher.Dispatch(domainEvents); + } - await _cleanupOutboxRecords.CleanupExpiredOutboxRecords(stoppingToken); - await transaction.Commit(stoppingToken); + await transaction.Commit(stoppingToken); - await Task.Delay(Delay, stoppingToken); + await Task.Delay(Delay, stoppingToken); + } + } + catch (TaskCanceledException) + { + Debug.WriteLine("BackgroundDispatchWorker stopped. Reason: TaskCanceledException."); } } } \ No newline at end of file diff --git a/src/MinimalDomainEvents.Outbox.Worker/IOutboxDispatcherBuilderExtensions.cs b/src/MinimalDomainEvents.Outbox.Worker/IOutboxDispatcherBuilderExtensions.cs index 9bf5dc3..1755b1e 100644 --- a/src/MinimalDomainEvents.Outbox.Worker/IOutboxDispatcherBuilderExtensions.cs +++ b/src/MinimalDomainEvents.Outbox.Worker/IOutboxDispatcherBuilderExtensions.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using MinimalDomainEvents.Outbox.Abstractions; namespace MinimalDomainEvents.Outbox.Worker; @@ -8,6 +9,7 @@ public static class IOutboxDispatcherBuilderExtensions public static IOutboxDispatcherBuilder WithHostingDispatcher(this IOutboxDispatcherBuilder builder) { builder.Services.TryAddScoped(); + builder.Services.AddHostedService(); return builder; } } \ No newline at end of file diff --git a/src/MinimalDomainEvents.Outbox/MinimalDomainEvents.Outbox.csproj b/src/MinimalDomainEvents.Outbox/MinimalDomainEvents.Outbox.csproj index 41a5459..e2eaef4 100644 --- a/src/MinimalDomainEvents.Outbox/MinimalDomainEvents.Outbox.csproj +++ b/src/MinimalDomainEvents.Outbox/MinimalDomainEvents.Outbox.csproj @@ -32,6 +32,7 @@ + diff --git a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MinimalDomainEvents.Outbox.MongoDb.UnitTests.csproj b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MinimalDomainEvents.Outbox.MongoDb.UnitTests.csproj index 31889bc..ab74a4c 100644 --- a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MinimalDomainEvents.Outbox.MongoDb.UnitTests.csproj +++ b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MinimalDomainEvents.Outbox.MongoDb.UnitTests.csproj @@ -14,7 +14,6 @@ - runtime; build; native; contentfiles; analyzers; buildtransitive @@ -30,6 +29,7 @@ + diff --git a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerCollectionDefinition.cs b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerCollectionDefinition.cs new file mode 100644 index 0000000..6b91add --- /dev/null +++ b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerCollectionDefinition.cs @@ -0,0 +1,5 @@ +using MongoTestContainer; + +namespace MinimalDomainEvents.Outbox.MongoDb.UnitTests; +[CollectionDefinition("MongoDb Integration")] +public partial class MongoContainerCollectionDefinition : ICollectionFixture { } diff --git a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoWorkerTests.cs b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoWorkerTests.cs index 57fc1fc..5c7a8e8 100644 --- a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoWorkerTests.cs +++ b/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoWorkerTests.cs @@ -1,12 +1,12 @@ using MessagePack; using Microsoft.Extensions.DependencyInjection; using MinimalDomainEvents.Contract; -using MinimalDomainEvents.Core; using MinimalDomainEvents.Dispatcher; using MinimalDomainEvents.Dispatcher.Abstractions; using MinimalDomainEvents.Outbox.Abstractions; using MinimalDomainEvents.Outbox.Worker.Abstractions; using MongoDB.Driver; +using MongoTestContainer; namespace MinimalDomainEvents.Outbox.MongoDb.UnitTests; [Collection("MongoDb Integration")] @@ -47,7 +47,6 @@ internal async Task Using_Transactions_Persists_OutboxRecords_Within_Same_Transa var transactionProvider = serviceProvider.GetRequiredService(); var retriever = serviceProvider.GetRequiredService(); - var cleaner = serviceProvider.GetRequiredService(); using var transaction = await transactionProvider.NewTransaction(); var outboxRecords = await retriever.GetAndMarkAsDispatched(); diff --git a/test/MinimalDomainEvents.Outbox.Worker.UnitTests/BackgroundDispatchWorkerTests.cs b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/BackgroundDispatchWorkerTests.cs index 33d7ae9..827c8a7 100644 --- a/test/MinimalDomainEvents.Outbox.Worker.UnitTests/BackgroundDispatchWorkerTests.cs +++ b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/BackgroundDispatchWorkerTests.cs @@ -1,12 +1,33 @@ +using MediatR; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; using MinimalDomainEvents.Contract; +using MinimalDomainEvents.Dispatcher; using MinimalDomainEvents.Dispatcher.Abstractions; +using MinimalDomainEvents.Dispatcher.MediatR; using MinimalDomainEvents.Outbox.Abstractions; -using MinimalDomainEvents.Outbox.Worker.Abstractions; +using MinimalDomainEvents.Outbox.MongoDb; +using MongoDB.Driver; +using MongoTestContainer; namespace MinimalDomainEvents.Outbox.Worker.UnitTests; - -public class BackgroundDispatchWorkerTests +[Collection("MongoDb Integration")] +public class BackgroundDispatchWorkerTests(MongoContainerFixture fixture) : IAsyncLifetime { + private const string DatabaseName = "testdatabase"; + + public Task InitializeAsync() + { + return Task.CompletedTask; + } + + public async Task DisposeAsync() + { + var mongoClient = CreateClient(); + await mongoClient.DropDatabaseAsync(DatabaseName); + } + [Fact] public async Task Worker_RetrievesDomainEventsFromOutbox_AndDispatchesThem_AndCommitsTransaction() { @@ -15,7 +36,6 @@ public async Task Worker_RetrievesDomainEventsFromOutbox_AndDispatchesThem_AndCo var collectionInitializer = new Mock(); var transactionProviderMock = new Mock(); var domainEventRetrieverMock = new Mock(); - var outboxCleanerMock = new Mock(); var domainEventDispatcherMock = new Mock(); var outboxTransactionMock = new Mock(); var domainEvent = Mock.Of(); @@ -32,10 +52,8 @@ public async Task Worker_RetrievesDomainEventsFromOutbox_AndDispatchesThem_AndCo var sut = new BackgroundDispatchWorker(collectionInitializer.Object, transactionProviderMock.Object, domainEventRetrieverMock.Object, - outboxCleanerMock.Object, [domainEventDispatcherMock.Object]); - try { await sut.StartAsync(cancellationToken); @@ -44,8 +62,69 @@ public async Task Worker_RetrievesDomainEventsFromOutbox_AndDispatchesThem_AndCo collectionInitializer.Verify(x => x.Initialize(It.IsAny()), Times.Once); domainEventRetrieverMock.Verify(x => x.GetAndMarkAsDispatched(It.IsAny()), Times.Once); - domainEventDispatcherMock.Verify(x => x.Dispatch(It.Is>(c => c.Single() == domainEvent)), Times.Once); - outboxCleanerMock.Verify(x => x.CleanupExpiredOutboxRecords(It.IsAny()), Times.Once); + domainEventDispatcherMock.Verify(x => x.Dispatch(It.Is>(c => c.Single() == domainEvent)), Times.Once); outboxTransactionMock.Verify(x => x.Commit(It.IsAny()), Times.Once); } + + [Fact] + public async Task Given_MongoPersistence_Outbox_CreatesIndexes_And_DispatchesItems() + { + var mongoClient = CreateClient(); + IServiceCollection serviceCollection = new ServiceCollection(); + serviceCollection.AddMediatR(cfg => + { + cfg.RegisterServicesFromAssemblyContaining(GetType()); + }); + serviceCollection.RemoveAll>(); + serviceCollection.AddSingleton, TestHandler>(); + + serviceCollection.AddDomainEventDispatcher(dispatcherBuilder => + { + dispatcherBuilder.AddMediatorDispatcher(); + dispatcherBuilder.AddOutbox(outboxBuilder => + { + outboxBuilder.AddMongo(mongoClient); + outboxBuilder.WithDatabase(DatabaseName); + outboxBuilder.WithHostingDispatcher(); + }); + }); + + using var serviceProvider = serviceCollection.BuildServiceProvider(); + + var outboxDispatcher = serviceProvider.GetRequiredService(); + outboxDispatcher.Should().BeOfType(); + outboxDispatcher.RaiseDomainEvent(new TestDomainEvent("TestValue")); + + var backgroundWorker = serviceProvider.GetRequiredService() as BackgroundDispatchWorker; + var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromHours(5)); + + await backgroundWorker!.StartAsync(cancellationTokenSource.Token); + await Task.Delay(1000); //Wait for the collection to initialize. + await outboxDispatcher.DispatchAndClear(); //Persist the domain events in the OutboxRecords collection. + await Task.Delay(2000); //Wait for the worker to dispatch the OutboxRecords. + + var mongoDatabase = mongoClient.GetDatabase(DatabaseName); + var outboxRecordCollection = mongoDatabase.GetCollection("OutboxRecords"); + var indexes = await (await outboxRecordCollection.Indexes.ListAsync()).ToListAsync(); + indexes.Should().HaveCount(3); //TODO - Assert them all. + outboxRecordCollection.AsQueryable().All(or => or.DispatchedAt != null).Should().BeTrue(); + + var testDomainEventHandler = serviceProvider.GetRequiredService>() as TestHandler; + testDomainEventHandler!.Value.Should().Be("TestValue"); + } + + public sealed record class TestDomainEvent(string Value) : IDomainEvent, INotification; + + public class TestHandler : INotificationHandler + { + public string Value { get; private set; } = string.Empty; + + public Task Handle(TestDomainEvent notification, CancellationToken cancellationToken) + { + Value = notification.Value; + return Task.CompletedTask; + } + } + + private MongoClient CreateClient() => new(fixture.ConnectionString); } \ No newline at end of file diff --git a/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MinimalDomainEvents.Outbox.Worker.UnitTests.csproj b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MinimalDomainEvents.Outbox.Worker.UnitTests.csproj index 67c1f75..d88c77d 100644 --- a/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MinimalDomainEvents.Outbox.Worker.UnitTests.csproj +++ b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MinimalDomainEvents.Outbox.Worker.UnitTests.csproj @@ -11,21 +11,28 @@ - + + + - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all + + + + + diff --git a/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MongoContainerCollectionDefinition.cs b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MongoContainerCollectionDefinition.cs new file mode 100644 index 0000000..a0db13b --- /dev/null +++ b/test/MinimalDomainEvents.Outbox.Worker.UnitTests/MongoContainerCollectionDefinition.cs @@ -0,0 +1,5 @@ +using MongoTestContainer; + +namespace MinimalDomainEvents.Outbox.Worker.UnitTests; +[CollectionDefinition("MongoDb Integration")] +public partial class MongoContainerCollectionDefinition : ICollectionFixture { } \ No newline at end of file diff --git a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerFixture.cs b/test/MongoTestContainer/MongoContainerFixture.cs similarity index 80% rename from test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerFixture.cs rename to test/MongoTestContainer/MongoContainerFixture.cs index 9e069f5..cc86b19 100644 --- a/test/MinimalDomainEvents.Outbox.MongoDb.UnitTests/MongoContainerFixture.cs +++ b/test/MongoTestContainer/MongoContainerFixture.cs @@ -1,10 +1,8 @@ using DotNet.Testcontainers.Builders; using Testcontainers.MongoDb; +using Xunit; -namespace MinimalDomainEvents.Outbox.MongoDb.UnitTests; -[CollectionDefinition("MongoDb Integration")] -public sealed class MognoContainerCollectionFixture : ICollectionFixture { } - +namespace MongoTestContainer; public sealed class MongoContainerFixture : IAsyncLifetime { public string? ConnectionString => _container?.GetConnectionString(); diff --git a/test/MongoTestContainer/MongoTestContainer.csproj b/test/MongoTestContainer/MongoTestContainer.csproj new file mode 100644 index 0000000..d74c829 --- /dev/null +++ b/test/MongoTestContainer/MongoTestContainer.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + +