From 7784e9905f4576b0eb57ad1d56acb11c8c349a89 Mon Sep 17 00:00:00 2001 From: Chris Morrell Date: Wed, 15 Nov 2023 16:26:31 -0500 Subject: [PATCH 1/3] Basic concurrency guards --- src/Exceptions/ConcurrencyException.php | 9 +++++ src/Lifecycle/EventStore.php | 42 +++++++++++++++++++ src/Support/StateCollection.php | 6 +++ tests/Unit/ConcurrencyTest.php | 54 +++++++++++++++++++++++++ 4 files changed, 111 insertions(+) create mode 100644 src/Exceptions/ConcurrencyException.php create mode 100644 tests/Unit/ConcurrencyTest.php diff --git a/src/Exceptions/ConcurrencyException.php b/src/Exceptions/ConcurrencyException.php new file mode 100644 index 00000000..9784c429 --- /dev/null +++ b/src/Exceptions/ConcurrencyException.php @@ -0,0 +1,9 @@ +lazyById(); } + /** @param Event[] $events */ public function write(array $events): bool { + $this->guardAgainstConcurrentWrites($events); + return VerbEvent::insert(static::formatForWrite($events)) && VerbStateEvent::insert(static::formatRelationshipsForWrite($events)); } + /** @param Event[] $events */ + protected function guardAgainstConcurrentWrites(array $events): void + { + $max_event_ids = new Collection(); + + $query = VerbStateEvent::query() + ->toBase() + ->select(['state_type', 'state_id', DB::raw('MAX(event_id) as max_event_id')]) + ->orderBy('state_id'); + + $query->where(function (BaseBuilder $query) use ($events, $max_event_ids) { + foreach ($events as $event) { + foreach ($event->states() as $state) { + if (! $max_event_ids->has($key = $state::class.$state->id)) { + $query->orWhere(function (BaseBuilder $query) use ($state) { + $query->where('state_type', $state::class); + $query->where('state_id', $state->id); + }); + $max_event_ids->put($key, $state->last_event_id); + } + } + } + }); + + $query->each(function ($result) use ($max_event_ids) { + $key = data_get($result, 'state_type').data_get($result, 'state_id'); + $max_written_id = (int) data_get($result, 'max_event_id'); + $max_expected_id = $max_event_ids->get($key, 0); + + if ($max_written_id > $max_expected_id) { + throw new ConcurrencyException("An event with ID {$max_written_id} has been written to the database, which is higher than {$max_expected_id}, which is in memory."); + } + }); + } + /** @param Event[] $event_objects */ protected static function formatForWrite(array $event_objects): array { diff --git a/src/Support/StateCollection.php b/src/Support/StateCollection.php index 7adb9d58..2bedd1b6 100644 --- a/src/Support/StateCollection.php +++ b/src/Support/StateCollection.php @@ -5,6 +5,12 @@ use Illuminate\Support\Collection; use Thunk\Verbs\State; +/** + * @template TKey of array-key + * + * @implements \ArrayAccess + * @implements \Illuminate\Support\Enumerable + */ class StateCollection extends Collection { protected array $aliases = []; diff --git a/tests/Unit/ConcurrencyTest.php b/tests/Unit/ConcurrencyTest.php new file mode 100644 index 00000000..9018eac7 --- /dev/null +++ b/tests/Unit/ConcurrencyTest.php @@ -0,0 +1,54 @@ +id = 1; + ConcurrencyTestState::singleton()->last_event_id = 1; + + $store->write([$event]); + + $event2 = new ConcurrencyTestEvent(); + $event2->id = 2; + ConcurrencyTestState::singleton()->last_event_id = 2; + + $store->write([$event2]); + + expect(VerbEvent::count())->toBe(2); +}); + +it('throws on non-sequential events', function () { + $store = app(EventStore::class); + + $event = new ConcurrencyTestEvent(); + $event->id = 2; + ConcurrencyTestState::singleton()->last_event_id = 2; + + $store->write([$event]); + + $event2 = new ConcurrencyTestEvent(); + $event2->id = 1; + ConcurrencyTestState::singleton()->last_event_id = 1; + + $store->write([$event2]); +})->throws(ConcurrencyException::class); + +class ConcurrencyTestEvent extends Event +{ + public function states(): StateCollection + { + return StateCollection::make([ConcurrencyTestState::singleton()]); + } +} + +class ConcurrencyTestState extends State +{ +} From 5c9677794b4dc57caf92391d589773a6542651cd Mon Sep 17 00:00:00 2001 From: Chris Morrell Date: Wed, 15 Nov 2023 16:57:42 -0500 Subject: [PATCH 2/3] More generic aggregate format --- src/Lifecycle/EventStore.php | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/Lifecycle/EventStore.php b/src/Lifecycle/EventStore.php index f63b47a6..4de46d87 100644 --- a/src/Lifecycle/EventStore.php +++ b/src/Lifecycle/EventStore.php @@ -54,10 +54,19 @@ protected function guardAgainstConcurrentWrites(array $events): void { $max_event_ids = new Collection(); - $query = VerbStateEvent::query() - ->toBase() - ->select(['state_type', 'state_id', DB::raw('MAX(event_id) as max_event_id')]) - ->orderBy('state_id'); + $query = VerbStateEvent::query()->toBase(); + + $query->select([ + 'state_type', + 'state_id', + DB::raw(sprintf( + 'max(%s) as %s', + $query->getGrammar()->wrap('event_id'), + $query->getGrammar()->wrapTable('max_event_id') + )), + ]); + + $query->orderBy('id'); $query->where(function (BaseBuilder $query) use ($events, $max_event_ids) { foreach ($events as $event) { From 089a40509f1e6617f2ce92b81f09933cbc548499 Mon Sep 17 00:00:00 2001 From: Chris Morrell Date: Wed, 15 Nov 2023 17:09:43 -0500 Subject: [PATCH 3/3] Only write if we have events --- src/Lifecycle/EventStore.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Lifecycle/EventStore.php b/src/Lifecycle/EventStore.php index 4de46d87..447d78c1 100644 --- a/src/Lifecycle/EventStore.php +++ b/src/Lifecycle/EventStore.php @@ -43,6 +43,10 @@ public function read( /** @param Event[] $events */ public function write(array $events): bool { + if (empty($events)) { + return true; + } + $this->guardAgainstConcurrentWrites($events); return VerbEvent::insert(static::formatForWrite($events))