Skip to content

Commit

Permalink
Fixed the incorrect concurrency checking in all IDataStore implementa…
Browse files Browse the repository at this point in the history
…tions. Closes #82.
  • Loading branch information
jezzsantos committed Feb 10, 2025
1 parent 6bb9442 commit 981d3a9
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ public async Task<Result<Optional<CommandEntity>, Error>> RetrieveAsync(string c

return Optional<CommandEntity>.None;
}

private async Task<Result<Error>> AddExclusiveAsync(string containerName, Dictionary<string, object> wheres,
CommandEntity entity,
CancellationToken cancellationToken)
{
containerName.ThrowIfNotValuedParameter(nameof(containerName),
Resources.AnyStore_MissingContainerName);
ArgumentNullException.ThrowIfNull(entity);

return await ExecuteSqlInsertExclusiveCommandAsync(containerName, wheres, entity.ToTableEntity(),
cancellationToken);
}
}

internal static class SqlServerQueryBuilderExtensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,31 @@ public async Task<Result<string, Error>> AddEventsAsync(string entityName, strin
var latestStoredEventVersion = latest.Value.HasValue
? latest.Value.Value.Version.ToOptional()
: Optional<int>.None;
var @checked = this.VerifyConcurrencyCheck(streamName, latestStoredEventVersion, events.First().Version);
var @checked = this.VerifyContiguousCheck(streamName, latestStoredEventVersion, events.First().Version);
if (@checked.IsFailure)
{
return @checked.Error;
}

foreach (var @event in events)
{
var added = await AddAsync(DetermineEventStoreContainerName(),
var version = @event.Version;
var wheres = new Dictionary<string, object>
{
{ nameof(EventStoreEntity.Version), version },
{ nameof(EventStoreEntity.StreamName), streamName }
};
var added = await AddExclusiveAsync(DetermineEventStoreContainerName(), wheres,
CommandEntity.FromDto(@event.ToTabulated(entityName, streamName)), cancellationToken);
if (added.IsFailure)
{
if (added.Error.Is(ErrorCode.EntityExists))
{
return Error.EntityExists(
Common.Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format(
streamName, version));
}

return added.Error;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ private async Task<Result<Error>> ExecuteSqlUpdateCommandAsync(string tableName,
}
}

/// <summary>
/// Inserts the entity into the table whether it exists or not.
/// </summary>
private async Task<Result<Error>> ExecuteSqlInsertCommandAsync(string tableName,
Dictionary<string, object> parameters, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -109,6 +112,80 @@ private async Task<Result<Error>> ExecuteSqlInsertCommandAsync(string tableName,
}
}

/// <summary>
/// Inserts the entity into the table, as long as the entity does not already exist for the specified
/// <see cref="wheresParameter" />, otherwise return <see cref="ErrorCode.EntityExists" />
/// </summary>
private async Task<Result<Error>> ExecuteSqlInsertExclusiveCommandAsync(string tableName,
Dictionary<string, object> wheresParameter,
Dictionary<string, object> parameters, CancellationToken cancellationToken)
{
const int whereParameterOffset = 500; // Arbitrary offset to avoid parameter collision
var columnNames = string.Join(',', parameters.Select(p => p.Key.ToColumnName()));
var columnIndex = 1;
var columnValuePlaceholders = string.Join(',', parameters.Select(_ => $"@{columnIndex++}"));
var existsIndex = whereParameterOffset;
var existsColumnNames = string.Join(' ', wheresParameter.Select(where =>
$"{(existsIndex++ > whereParameterOffset ? "AND " : "")}{where.Key.ToColumnName()} = @{existsIndex - 1}"));
var commandText = $"""
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
BEGIN TRANSACTION
IF NOT EXISTS (
SELECT [Id]
FROM {tableName.ToTableName()}
WHERE {existsColumnNames}
)
BEGIN
INSERT INTO {tableName.ToTableName()} ({columnNames})
VALUES ({columnValuePlaceholders})
END
COMMIT TRANSACTION
""";

await using var connection = new SqlConnection(_connectionOptions.ConnectionString);
{
try
{
await connection.OpenAsync(cancellationToken);
int numRecords;
await using (var command = new SqlCommand(commandText, connection))
{
var whereParameterIndex = whereParameterOffset;
foreach (var whereParameter in wheresParameter)
{
command.Parameters.AddWithValue($"@{whereParameterIndex++}", whereParameter.Value);
}

var parameterIndex = 1;
foreach (var parameter in parameters)
{
command.Parameters.AddWithValue($"@{parameterIndex++}", parameter.Value);
}

numRecords = await command.ExecuteNonQueryAsync(cancellationToken);
}

await connection.CloseAsync();
if (numRecords == -1)
{
_recorder.TraceWarning(null, "SQLServer executed SQL {Command}, but found existing record",
commandText);
return Error.EntityExists();
}

_recorder.TraceInformation(null, "SQLServer executed SQL {Command}, affecting {Affecting} records",
commandText, numRecords);

return Result.Ok;
}
catch (Exception ex)
{
_recorder.TraceError(null, ex, "SQLServer failed executing SQL {Command}", commandText);
return ex.ToError(ErrorCode.Unexpected);
}
}
}

private async Task<Result<Error>> ExecuteSqlDeleteCommandAsync(string tableName,
KeyValuePair<string, object>? whereParameter, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -350,7 +427,7 @@ public static Dictionary<string, object> ToTableEntity(this CommandEntity entity
var targetPropertyType = entity.GetPropertyType(key);
properties.Add(key, ToTableEntityProperty(value, targetPropertyType));
}

properties[nameof(CommandEntity.LastPersistedAtUtc)] = DateTime.UtcNow;

return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class EventStoreExtensionsSpec
[Fact]
public void WhenVerifyConcurrencyCheckAndNothingStoredAndFirstVersionIsNotFirst_TheReturnsError()
{
var result = _eventStore.Object.VerifyConcurrencyCheck("astreamname", Optional<int>.None, 10);
var result = _eventStore.Object.VerifyContiguousCheck("astreamname", Optional<int>.None, 10);

result.Should().BeError(ErrorCode.EntityExists,
Resources.EventStore_ConcurrencyVerificationFailed_StreamReset.Format("astreamname"));
Expand All @@ -27,15 +27,15 @@ public void WhenVerifyConcurrencyCheckAndNothingStoredAndFirstVersionIsNotFirst_
public void WhenVerifyConcurrencyCheckAndNothingStoredAndFirstVersionIsFirst_ThenPasses()
{
var result =
_eventStore.Object.VerifyConcurrencyCheck("astreamname", Optional<int>.None, EventStream.FirstVersion);
_eventStore.Object.VerifyContiguousCheck("astreamname", Optional<int>.None, EventStream.FirstVersion);

result.Should().BeSuccess();
}

[Fact]
public void WhenVerifyConcurrencyCheckAndFirstVersionIsSameAsStored_TheReturnsError()
{
var result = _eventStore.Object.VerifyConcurrencyCheck("astreamname", 2, 2);
var result = _eventStore.Object.VerifyContiguousCheck("astreamname", 2, 2);

result.Should().BeError(ErrorCode.EntityExists,
Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format("astreamname", 2));
Expand All @@ -44,7 +44,7 @@ public void WhenVerifyConcurrencyCheckAndFirstVersionIsSameAsStored_TheReturnsEr
[Fact]
public void WhenVerifyConcurrencyCheckAndFirstVersionIsBeforeStored_TheReturnsError()
{
var result = _eventStore.Object.VerifyConcurrencyCheck("astreamname", 2, 1);
var result = _eventStore.Object.VerifyContiguousCheck("astreamname", 2, 1);

result.Should().BeError(ErrorCode.EntityExists,
Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format("astreamname", 1));
Expand All @@ -53,7 +53,7 @@ public void WhenVerifyConcurrencyCheckAndFirstVersionIsBeforeStored_TheReturnsEr
[Fact]
public void WhenVerifyConcurrencyCheckAndFirstVersionIsAfterStoredButNotContiguous_TheReturnsError()
{
var result = _eventStore.Object.VerifyConcurrencyCheck("astreamname", 1, 3);
var result = _eventStore.Object.VerifyContiguousCheck("astreamname", 1, 3);

result.Should().BeError(ErrorCode.EntityExists,
Resources.EventStore_ConcurrencyVerificationFailed_MissingUpdates.Format("astreamname", 2, 3));
Expand All @@ -62,7 +62,7 @@ public void WhenVerifyConcurrencyCheckAndFirstVersionIsAfterStoredButNotContiguo
[Fact]
public void WhenVerifyConcurrencyCheckAndFirstVersionIsNextAfterStored_ThenPasses()
{
var result = _eventStore.Object.VerifyConcurrencyCheck("astreamname", 1, 2);
var result = _eventStore.Object.VerifyContiguousCheck("astreamname", 1, 2);

result.Should().BeSuccess();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ public static class EventStoreExtensions
{
/// <summary>
/// Verifies that the version of the latest event produced by the aggregate is the next event in the stream of events
/// from the store.
/// In other words, that the event stream of the aggregate in the store has not been updated while the
/// aggregate has been changed in memory.
/// from the store, with no version gaps between them. IN other words, they are contiguous
/// </summary>
public static Result<Error> VerifyConcurrencyCheck(this IEventStore eventStore, string streamName,
public static Result<Error> VerifyContiguousCheck(this IEventStore eventStore, string streamName,
Optional<int> latestStoredEventVersion, int nextEventVersion)
{
if (!latestStoredEventVersion.HasValue)
Expand All @@ -27,13 +25,6 @@ public static Result<Error> VerifyConcurrencyCheck(this IEventStore eventStore,
return Result.Ok;
}

if (nextEventVersion <= latestStoredEventVersion)
{
return Error.EntityExists(
Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format(streamName,
nextEventVersion));
}

var expectedNextVersion = latestStoredEventVersion + 1;
if (nextEventVersion > expectedNextVersion)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Infrastructure.Persistence.Shared.IntegrationTests</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Infrastructure.Persistence.Shared</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Infrastructure.Persistence.Kurrent</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Infrastructure.Persistence.Azure</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
using Domain.Common.ValueObjects;
using Domain.Interfaces.Entities;
using FluentAssertions;
using Infrastructure.Persistence.Common;
using Infrastructure.Persistence.Interfaces;
using QueryAny;
using UnitTesting.Common;
using UnitTesting.Common.Validation;
using Xunit;
using Resources = Infrastructure.Persistence.Common.Resources;
using Task = System.Threading.Tasks.Task;

namespace Infrastructure.Persistence.Shared.IntegrationTests;
Expand Down Expand Up @@ -154,7 +154,7 @@ await _setup.Store
}

[Fact]
public async Task WhenAddEvents_ThenEventsAdded()
public async Task WhenAddEventsWithFirstEvent_ThenEventsAdded()
{
var entityId = GetNextEntityId();
await _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,
Expand All @@ -168,6 +168,27 @@ await _setup.Store.GetEventStreamAsync(_setup.ContainerName, entityId,
result.Value.Last().Id.Should().Be("anid_v1");
}

[Fact]
public async Task WhenAddEventsAtSameTime_ThenReturnsConcurrencyError()
{
var entityId = GetNextEntityId();
var sameEvent = CreateEvent(1);
var add1 = _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,
[sameEvent], CancellationToken.None);
var add2 = _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,
[sameEvent], CancellationToken.None);
var add3 = _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,
[sameEvent], CancellationToken.None);

var result = await Task.WhenAll(add1, add2, add3);

result.Length.Should().Be(3);
result.Count(x => x.IsSuccessful).Should().Be(1);
result.Count(x => x is { IsFailure: true, Error.Code: ErrorCode.EntityExists } && x.Error.Message ==
Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated
.Format($"testentities_{entityId}", 1)).Should().Be(2);
}

[Fact]
public async Task WhenAddEventsAndStreamCleared_ThenReturnsError()
{
Expand Down Expand Up @@ -213,7 +234,7 @@ await _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,

var result = await _setup.Store.AddEventsAsync(_setup.ContainerName, entityId,
[CreateEvent(10)], CancellationToken.None);

result.Should().BeError(ErrorCode.EntityExists,
Resources.EventStore_ConcurrencyVerificationFailed_MissingUpdates.Format(
$"testentities_{entityId}", 4, 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,35 @@ public async Task<Result<string, Error>> AddEventsAsync(string entityName, strin
var latestStoredEventVersion = latestStoredEvent.HasValue
? latestStoredEvent.Value.Version.ToOptional()
: Optional<int>.None;
var concurrencyCheck =
this.VerifyConcurrencyCheck(streamName, latestStoredEventVersion, Enumerable.First(events).Version);
if (concurrencyCheck.IsFailure)
var @checked =
this.VerifyContiguousCheck(streamName, latestStoredEventVersion, Enumerable.First(events).Version);
if (@checked.IsFailure)
{
return concurrencyCheck.Error;
return @checked.Error;
}

events.ForEach(@event =>
foreach (var @event in events)
{
var entity = CommandEntity.FromDto(@event.ToTabulated(entityName, streamName));
var version = @event.Version;

if (!_events.ContainsKey(entityName))
{
_events.Add(entityName, new Dictionary<string, HydrationProperties>());
}

_events[entityName].Add(entity.Id, entity.ToHydrationProperties());
});
try
{
var stream = _events[entityName];
stream.Add(entity.Id, entity.ToHydrationProperties());
}
catch (ArgumentException)
{
return Error.EntityExists(
Common.Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format(
streamName, version));
}
}

return streamName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,32 @@ public async Task<Result<string, Error>> AddEventsAsync(string entityName, strin
var latestStoredEventVersion = latestStoredEvent.HasValue
? latestStoredEvent.Value.Version.ToOptional()
: Optional<int>.None;
var concurrencyCheck =
this.VerifyConcurrencyCheck(streamName, latestStoredEventVersion, Enumerable.First(events).Version);
if (concurrencyCheck.IsFailure)
var @checked =
this.VerifyContiguousCheck(streamName, latestStoredEventVersion, Enumerable.First(events).Version);
if (@checked.IsFailure)
{
return concurrencyCheck.Error;
return @checked.Error;
}

foreach (var @event in events)
{
var entity = CommandEntity.FromDto(@event.ToTabulated(entityName, streamName));

var container = EnsureContainer(GetEventStoreContainerPath(entityName, entityId));
await container.WriteAsync(entity.Id, entity.ToFileProperties(), cancellationToken);
var version = @event.Version;
var filename = $"version_{version:D3}";
var added = await container.WriteExclusiveAsync(filename, entity.ToFileProperties(), cancellationToken);
if (added.IsFailure)
{
if (added.Error.Is(ErrorCode.EntityExists))
{
return Error.EntityExists(
Common.Resources.EventStore_ConcurrencyVerificationFailed_StreamAlreadyUpdated.Format(
streamName, version));
}

return added.Error;
}
}

return streamName;
Expand Down
Loading

0 comments on commit 981d3a9

Please sign in to comment.