Skip to content

Commit

Permalink
Rename UnitOfWorkPipelineBehaviorConcurrentCounter to ConcurrentComma…
Browse files Browse the repository at this point in the history
…ndCounter, delay telemetry events until all commands are processed
  • Loading branch information
tjementum committed Dec 6, 2023
1 parent 3b79408 commit 980c5b6
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Assembly applicationAssembly
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(UnitOfWorkPipelineBehavior<,>)); // Post
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(PublishDomainEventsPipelineBehavior<,>)); // Post
services.AddScoped<ITelemetryEventsCollector, TelemetryEventsCollector>();
services.AddScoped<UnitOfWorkPipelineBehaviorConcurrentCounter>();
services.AddScoped<ConcurrentCommandCounter>();

services.AddMediatR(configuration => configuration.RegisterServicesFromAssemblies(applicationAssembly));
services.AddNonGenericValidators(applicationAssembly);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace PlatformPlatform.SharedKernel.ApplicationCore.Behaviors;

/// <summary>
/// The ConcurrentCommandCounter class is a concurrent counter used to count the number of concurrent commands that
/// are being handled. It is used by only commit changes to the database when all commands have been handled.
/// This is to ensure that all changes to all aggregates and entities are committed to the database only after all
/// command and domain events are successfully handled.
/// Additionally, this also ensures that Telemetry is only sent to Application Insights after all commands and
/// domain events are successfully handled.
/// </summary>
public sealed class ConcurrentCommandCounter
{
private int _concurrentCount;

public void Increment()
{
Interlocked.Increment(ref _concurrentCount);
}

public void Decrement()
{
Interlocked.Decrement(ref _concurrentCount);
}

public bool IsZero()
{
return _concurrentCount == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ namespace PlatformPlatform.SharedKernel.ApplicationCore.Behaviors;

public sealed class PublishTelemetryEventsPipelineBehavior<TRequest, TResponse>(
ITelemetryEventsCollector telemetryEventsCollector,
TelemetryClient telemetryClient
TelemetryClient telemetryClient,
ConcurrentCommandCounter concurrentCommandCounter
) : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand where TResponse : ResultBase
{
public async Task<TResponse> Handle(
Expand All @@ -17,10 +18,13 @@ CancellationToken cancellationToken
{
var result = await next();

while (telemetryEventsCollector.HasEvents)
if (concurrentCommandCounter.IsZero())
{
var telemetryEvent = telemetryEventsCollector.Dequeue();
telemetryClient.TrackEvent(telemetryEvent.Name, telemetryEvent.Properties);
while (telemetryEventsCollector.HasEvents)
{
var telemetryEvent = telemetryEventsCollector.Dequeue();
telemetryClient.TrackEvent(telemetryEvent.Name, telemetryEvent.Properties);
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace PlatformPlatform.SharedKernel.ApplicationCore.Behaviors;
/// </summary>
public sealed class UnitOfWorkPipelineBehavior<TRequest, TResponse>(
IUnitOfWork unitOfWork,
UnitOfWorkPipelineBehaviorConcurrentCounter unitOfWorkPipelineBehaviorConcurrentCounter
ConcurrentCommandCounter concurrentCommandCounter
) : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand where TResponse : ResultBase
{
public async Task<TResponse> Handle(
Expand All @@ -21,13 +21,13 @@ public async Task<TResponse> Handle(
CancellationToken cancellationToken
)
{
unitOfWorkPipelineBehaviorConcurrentCounter.Increment();
concurrentCommandCounter.Increment();
var response = await next();

if (response is ResultBase { IsSuccess: true })
{
unitOfWorkPipelineBehaviorConcurrentCounter.Decrement();
if (unitOfWorkPipelineBehaviorConcurrentCounter.IsZero())
concurrentCommandCounter.Decrement();
if (concurrentCommandCounter.IsZero())
{
await unitOfWork.CommitAsync(cancellationToken);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public UnitOfWorkPipelineBehaviorTests()
services.AddSingleton(_unitOfWork);
_behavior = new UnitOfWorkPipelineBehavior<TestCommand, Result<TestAggregate>>(
_unitOfWork,
new UnitOfWorkPipelineBehaviorConcurrentCounter()
new ConcurrentCommandCounter()
);
}

Expand Down

0 comments on commit 980c5b6

Please sign in to comment.