Skip to content

Commit

Permalink
Refactor outbox inner workings
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken committed Apr 2, 2024
1 parent 7613445 commit a28befd
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task DispatchAndClear()
return;

foreach (var dispatcher in _dispatchers)
await dispatcher.Dispatch(domainEvents ?? new List<IDomainEvent>(0).AsReadOnly());
await dispatcher.Dispatch(domainEvents);
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public sealed class OutboxRecord

public OutboxRecord(DateTimeOffset enqueuedAt, byte[] messageData)
{
Id = Guid.NewGuid();
EnqueuedAt = enqueuedAt;
MessageData = messageData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public MongoDbOutboxTransaction(IClientSessionHandle clientSessionHandle)

public Task StartTransaction(CancellationToken cancellationToken = default)
{
ClientSessionHandle.StartTransaction();
ClientSessionHandle.StartTransaction();
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public Task Initialize(CancellationToken cancellationToken)
ReadPreference = ReadPreference.Primary,
WriteConcern = WriteConcern.WMajority
};

var collection = Provide(collectionSettings);

//TODO - Create indexes (recreate if exists with same name (because of versioning))
Expand Down
21 changes: 18 additions & 3 deletions src/MinimalDomainEvents.Outbox.Worker/BackgroundDispatchWorker.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,52 @@
using Microsoft.Extensions.Hosting;
using MinimalDomainEvents.Contract;
using MinimalDomainEvents.Dispatcher.Abstractions;
using MinimalDomainEvents.Outbox.Abstractions;
using MinimalDomainEvents.Outbox.Worker.Abstractions;

namespace MinimalDomainEvents.Outbox.Worker;
internal sealed class BackgroundDispatchWorker : BackgroundService
{
private const int Delay = 2 * 1000;

private readonly IOutboxRecordCollectionInitializer _outboxRecordCollectionInitializer;
private readonly ITransactionProvider _transactionFactory;
private readonly IDomainEventRetriever _domainEventRetriever;
private readonly ICleanupOutboxRecords _cleanupOutboxRecords;
private readonly IEnumerable<IDispatchDomainEvents> _dispatchers;

public BackgroundDispatchWorker(IOutboxRecordCollectionInitializer outboxRecordCollectionInitializer,
ITransactionProvider transactionFactory,
IDomainEventRetriever domainEventRetriever,
ICleanupOutboxRecords cleanupOutboxRecords)
ICleanupOutboxRecords cleanupOutboxRecords,
IEnumerable<IDispatchDomainEvents> dispatchers)
{
_domainEventRetriever = domainEventRetriever;
_outboxRecordCollectionInitializer = outboxRecordCollectionInitializer;
_transactionFactory = transactionFactory;
_cleanupOutboxRecords = cleanupOutboxRecords;
_dispatchers = dispatchers;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _outboxRecordCollectionInitializer.Initialize(stoppingToken);

while (true)
while (!stoppingToken.IsCancellationRequested)
{
using var transaction = await _transactionFactory.NewTransaction(stoppingToken);
var domainEvents = await _domainEventRetriever.GetAndMarkAsDispatched(stoppingToken);
//TODO - Dispatch domain events

if (domainEvents is not null && domainEvents.Count > 0)
{
foreach (var dispatcher in _dispatchers)
await dispatcher.Dispatch(domainEvents);
}

await _cleanupOutboxRecords.CleanupExpiredOutboxRecords(stoppingToken);
await transaction.Commit(stoppingToken);

await Task.Delay(Delay, stoppingToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using MinimalDomainEvents.Dispatcher.Abstractions;
using MinimalDomainEvents.Outbox.Abstractions;

Expand All @@ -8,18 +9,17 @@ public static class IDomainEventDispatcherBuilderExtensions
{
public static IDomainEventDispatcherBuilder AddOutbox(this IDomainEventDispatcherBuilder builder)
{
builder.Services.RemoveAll<IScopedDomainEventDispatcher>();
builder.Services
.AddScoped<IDispatchDomainEvents, OutboxDomainEventDispatcher>()
.AddScoped<IScopedDomainEventDispatcher, OutboxDomainEventDispatcher>()
;

return builder;
}

public static IDomainEventDispatcherBuilder AddOutbox(this IDomainEventDispatcherBuilder builder, Action<IOutboxDispatcherBuilder>? configure)
{
builder.Services
.AddScoped<IDispatchDomainEvents, OutboxDomainEventDispatcher>()
;
AddOutbox(builder);

if (configure is not null)
{
Expand Down
37 changes: 31 additions & 6 deletions src/MinimalDomainEvents.Outbox/OutboxDomainEventDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
using MessagePack;
using MinimalDomainEvents.Contract;
using MinimalDomainEvents.Core;
using MinimalDomainEvents.Dispatcher.Abstractions;
using MinimalDomainEvents.Outbox.Abstractions;

namespace MinimalDomainEvents.Outbox;
internal sealed class OutboxDomainEventDispatcher : IDispatchDomainEvents
internal sealed class OutboxDomainEventDispatcher : IScopedDomainEventDispatcher
{
public IDomainEventScope? Scope => _scope;

private IDomainEventScope? _scope;

private readonly OutboxSettings _settings;
private readonly IPersistOutboxRecords _domainEventPersister;

public OutboxDomainEventDispatcher(OutboxSettings settings, IPersistOutboxRecords domainEventPersister)
{
_scope = DomainEventTracker.CreateScope();
_domainEventPersister = domainEventPersister;
_settings = settings;
}

public async Task Dispatch(IReadOnlyCollection<IDomainEvent> domainEvents)
public void RaiseDomainEvent(IDomainEvent domainEvent)
{
ArgumentNullException.ThrowIfNull(domainEvents);
if (domainEvents.Count == 0)
_scope!.RaiseDomainEvent(domainEvent);
}

public async Task DispatchAndClear()
{
var domainEvents = _scope!.GetAndClearEvents();

if (domainEvents is null || domainEvents.Count == 0)
return;

if (_settings.SendBatched)
Expand Down Expand Up @@ -55,8 +67,21 @@ private static IReadOnlyCollection<OutboxRecord> CreateIndividualRecords(IReadOn
private static byte[] ToBinary<T>(T[] input)
{
var options = MessagePack.Resolvers.ContractlessStandardResolver.Options
.WithResolver(MessagePack.Resolvers.TypelessObjectResolver.Instance);

.WithResolver(MessagePack.Resolvers.TypelessObjectResolver.Instance)
.WithSecurity(MessagePackSecurity.UntrustedData)
;

return MessagePackSerializer.Typeless.Serialize(input, options);
}

public void Dispose()
{
GC.SuppressFinalize(this);

if (_scope is not null)
{
_scope.Dispose();
_scope = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ public async Task InitializeAsync()

public async Task DisposeAsync()
{
//await _container.DisposeAsync();
await _container.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,39 @@ public class DomaineventDispatcherBuilderExtensionsTests
[Fact]
public void Given_Configure_Null_Does_Not_Configure_OutboxDispatcherBuilder()
{
var serviceCollectionMock = new Mock<IServiceCollection>();
var sut = new DomainEventDispatcherBuilder(serviceCollectionMock.Object);
var serviceCollection = new ServiceCollection();
var sut = new DomainEventDispatcherBuilder(serviceCollection);

sut.AddOutbox(null);

serviceCollectionMock.Verify(x =>
x.Add(It.Is<ServiceDescriptor>(sd =>
sd.ServiceType == typeof(IDispatchDomainEvents)
&& sd.ImplementationType == typeof(OutboxDomainEventDispatcher)
&& sd.Lifetime == ServiceLifetime.Scoped)
),
Times.Once);
serviceCollectionMock.VerifyNoOtherCalls();
serviceCollection.Should().ContainSingle(x => x.ServiceType == typeof(IScopedDomainEventDispatcher))
.Which.ImplementationType.Should().Be(typeof(OutboxDomainEventDispatcher));
}

[Fact]
public void Given_Other_Implementation_Already_Registered_Overrides()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddScoped<IScopedDomainEventDispatcher, ScopedDomainEventDispatcher>();
var sut = new DomainEventDispatcherBuilder(serviceCollection);

sut.AddOutbox(null);

serviceCollection.Single(sc => sc.ServiceType == typeof(IScopedDomainEventDispatcher))
.ImplementationType.Should().Be(typeof(OutboxDomainEventDispatcher));
}

[Fact]
public void Given_Configure_Not_Null_Does_Configure_OutboxDispatcherBuilder()
{
var serviceCollectionMock = new Mock<IServiceCollection>();
var sut = new DomainEventDispatcherBuilder(serviceCollectionMock.Object);
var serviceCollection = new ServiceCollection();
var sut = new DomainEventDispatcherBuilder(serviceCollection);
var serviceDescriptor = new ServiceDescriptor(typeof(object), typeof(object), ServiceLifetime.Transient);

sut.AddOutbox(b => b.Services.Add(serviceDescriptor));

serviceCollectionMock.Verify(x =>
x.Add(It.Is<ServiceDescriptor>(sd =>
sd.ServiceType == typeof(IDispatchDomainEvents)
&& sd.ImplementationType == typeof(OutboxDomainEventDispatcher)
&& sd.Lifetime == ServiceLifetime.Scoped)
),
Times.Once);
serviceCollectionMock.Verify(x =>
x.Add(serviceDescriptor),
Times.Once);
serviceCollectionMock.VerifyNoOtherCalls();
serviceCollection.Should().ContainSingle(x => x.ServiceType == typeof(IScopedDomainEventDispatcher))
.Which.ImplementationType.Should().Be(typeof(OutboxDomainEventDispatcher));
serviceCollection.Should().ContainSingle(x => x == serviceDescriptor);
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
using MessagePack;
using MinimalDomainEvents.Contract;
using MinimalDomainEvents.Core;
using MinimalDomainEvents.Outbox.Abstractions;

namespace MinimalDomainEvents.Outbox.UnitTests;

public class OutboxDomainEventDispatcherTests
{
[Fact]
public async Task Given_DomainEvents_Null_Throws_ArgumentNullException()
{
var sut = new OutboxDomainEventDispatcher(OutboxSettings.Default, null!);

await FluentActions.Awaiting(() => sut.Dispatch(null!))
.Should()
.ThrowExactlyAsync<ArgumentNullException>();
}

[Fact]
public async Task Given_DomainEvents_Empty_Returns_Silently()
public async Task Given_DomainEvents_Null_DoesNotDoAnything()
{
var outboxRecordPersisterMock = new Mock<IPersistOutboxRecords>();
var sut = new OutboxDomainEventDispatcher(OutboxSettings.Default, outboxRecordPersisterMock.Object);

await sut.Dispatch([]);
await sut.DispatchAndClear();

outboxRecordPersisterMock.VerifyNoOtherCalls();
}
Expand All @@ -35,8 +26,8 @@ public async Task Given_SendBatched_True_Batches_DomainEvents_To_Single_OutboxRe
IReadOnlyCollection<IDomainEvent> domainEvents = [new TestEvent("A"), new TestEvent("B")];

var outboxRecordPersisterMock = new Mock<IPersistOutboxRecords>();
outboxRecordPersisterMock.Setup(x => x.PersistBatched(It.IsAny<OutboxRecord>()))
.Callback((OutboxRecord record) =>
outboxRecordPersisterMock.Setup(x => x.PersistBatched(It.IsAny<OutboxRecord>(), default))
.Callback((OutboxRecord record, CancellationToken _) =>
{
using var memoryStream = new MemoryStream(record.MessageData);
var domainEvents = MessagePackSerializer.Typeless.Deserialize(memoryStream, SerializerOptions) as IReadOnlyCollection<IDomainEvent>;
Expand All @@ -45,12 +36,16 @@ public async Task Given_SendBatched_True_Batches_DomainEvents_To_Single_OutboxRe
});

var sut = new OutboxDomainEventDispatcher(OutboxSettings.Default, outboxRecordPersisterMock.Object);
foreach (var domainEvent in domainEvents)
{
DomainEventTracker.RaiseDomainEvent(domainEvent);
}

// Act
await sut.Dispatch(domainEvents);
await sut.DispatchAndClear();

// Assert
outboxRecordPersisterMock.Verify(x => x.PersistBatched(It.IsAny<OutboxRecord>()), Times.Once);
outboxRecordPersisterMock.Verify(x => x.PersistBatched(It.IsAny<OutboxRecord>(), default), Times.Once);
callbackSucceeded.Should().BeTrue();
}

Expand All @@ -62,32 +57,39 @@ public async Task Given_SendBatched_False_Persists_OutboxRecords_Individually_An
IReadOnlyCollection<TestEvent> domainEvents = [new TestEvent("A"), new TestEvent("B")];

var outboxRecordPersisterMock = new Mock<IPersistOutboxRecords>();
outboxRecordPersisterMock.Setup(x => x.PersistIndividually(It.IsAny<IReadOnlyCollection<OutboxRecord>>()))
.Callback((IReadOnlyCollection<OutboxRecord> records) =>
outboxRecordPersisterMock.Setup(x => x.PersistIndividually(It.IsAny<IReadOnlyCollection<OutboxRecord>>(), default))
.Callback((IReadOnlyCollection<OutboxRecord> records, CancellationToken _) =>
{
for (var i = 0; i < records.Count; i++)
{
using var memoryStream = new MemoryStream(records.ElementAt(i).MessageData);
var domainEvent = MessagePackSerializer.Typeless.Deserialize(memoryStream, SerializerOptions) as TestEvent;
domainEvent.Should().NotBeNull();
domainEvent!.PropA.Should().Be(domainEvents.ElementAt(i).PropA);
}
callbackSucceeded = true;
});

var outboxSettings = new OutboxSettings { SendBatched = false };
var sut = new OutboxDomainEventDispatcher(outboxSettings, outboxRecordPersisterMock.Object);
foreach (var domainEvent in domainEvents)
{
DomainEventTracker.RaiseDomainEvent(domainEvent);
}

// Act
await sut.Dispatch(domainEvents);
await sut.DispatchAndClear();

// Assert
outboxRecordPersisterMock.Verify(x => x.PersistIndividually(It.IsAny<IReadOnlyCollection<OutboxRecord>>()), Times.Once);
outboxRecordPersisterMock.Verify(x => x.PersistIndividually(It.IsAny<IReadOnlyCollection<OutboxRecord>>(), default), Times.Once);
callbackSucceeded.Should().BeTrue();
}

private static MessagePackSerializerOptions SerializerOptions =>
MessagePack.Resolvers.ContractlessStandardResolver.Options
.WithResolver(MessagePack.Resolvers.TypelessObjectResolver.Instance);
.WithResolver(MessagePack.Resolvers.TypelessObjectResolver.Instance)
.WithSecurity(MessagePackSecurity.UntrustedData)
;

public sealed record class TestEvent(string PropA) : IDomainEvent;
}

0 comments on commit a28befd

Please sign in to comment.