Skip to content

Commit

Permalink
Merge pull request #32 from hirethunk/handle-stateless-events
Browse files Browse the repository at this point in the history
Better handling of "stateless" events and singleton states
  • Loading branch information
inxilpro authored Dec 6, 2023
2 parents 011b881 + 20e173c commit 3d79822
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 32 deletions.
13 changes: 13 additions & 0 deletions src/Exceptions/StateIsNotSingletonException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Thunk\Verbs\Exceptions;

use RuntimeException;

class StateIsNotSingletonException extends RuntimeException
{
public function __construct(string $type)
{
parent::__construct("Expected '{$type}' to be a singleton, but found multiple states.");
}
}
1 change: 0 additions & 1 deletion src/Facades/Verbs.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
* @method static bool commit()
* @method static bool isReplaying()
* @method static void unlessReplaying(callable $callback)
* @method static bool isReplaying()
* @method static int|string|null toId($id)
* @method static Event fire(Event $event)
*/
Expand Down
3 changes: 2 additions & 1 deletion src/Lifecycle/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public function replay()

$model->event()->states()
->each(fn ($state) => $this->dispatcher->apply($model->event(), $state))
->each(fn ($state) => $this->dispatcher->replay($model->event(), $state));
->each(fn ($state) => $this->dispatcher->replay($model->event(), $state))
->whenEmpty(fn () => $this->dispatcher->replay($model->event(), null));

return $model->event();
});
Expand Down
2 changes: 1 addition & 1 deletion src/Lifecycle/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function handle(Event $event): void
$this->getHandleHooks($event)->each(fn (Hook $hook) => $hook->handle($this->container, $event));
}

public function replay(Event $event, State $state): void
public function replay(Event $event, ?State $state): void
{
$this->getReplayHooks($event)->each(fn (Hook $hook) => $hook->replay($this->container, $event, $state));
}
Expand Down
11 changes: 9 additions & 2 deletions src/Lifecycle/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ class EventStore
public function read(
?State $state = null,
Bits|UuidInterface|AbstractUid|int|string|null $after_id = null,
Bits|UuidInterface|AbstractUid|int|string|null $up_to_id = null
Bits|UuidInterface|AbstractUid|int|string|null $up_to_id = null,
bool $singleton = false,
): LazyCollection {
if ($state) {
return VerbStateEvent::query()
->with('event')
->where('state_id', $state->id)
->unless($singleton, fn (Builder $query) => $query->where('state_id', $state->id))
->where('state_type', $state::class)
->when($after_id, fn (Builder $query) => $query->whereRelation('event', 'id', '>', Verbs::toId($after_id)))
->when($up_to_id, fn (Builder $query) => $query->whereRelation('event', 'id', '<=', Verbs::toId($up_to_id)))
Expand Down Expand Up @@ -87,6 +88,12 @@ protected function guardAgainstConcurrentWrites(array $events): void
}
});

// We can abort if there are no states associated with any of the
// events that we're writing (since concurrency doesn't apply in that case)
if ($max_event_ids->isEmpty()) {
return;
}

$query->each(function ($result) use ($max_event_ids) {
$state_type = data_get($result, 'state_type');
$state_id = data_get($result, 'state_id');
Expand Down
15 changes: 15 additions & 0 deletions src/Lifecycle/SnapshotStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Glhd\Bits\Bits;
use Ramsey\Uuid\UuidInterface;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Exceptions\StateIsNotSingletonException;
use Thunk\Verbs\Facades\Verbs;
use Thunk\Verbs\Models\VerbSnapshot;
use Thunk\Verbs\State;
Expand All @@ -19,6 +20,20 @@ public function load(Bits|UuidInterface|AbstractUid|int|string $id): ?State
return $snapshot?->state();
}

public function loadSingleton(string $type): ?State
{
$snapshots = VerbSnapshot::query()
->where('type', $type)
->limit(2)
->get();

if ($snapshots->count() > 1) {
throw new StateIsNotSingletonException($type);
}

return $snapshots->first()?->state();
}

public function write(array $states): bool
{
return VerbSnapshot::upsert(static::formatForWrite($states), 'id', ['data', 'last_event_id', 'updated_at']);
Expand Down
19 changes: 18 additions & 1 deletion src/Lifecycle/StateManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,26 @@ public function load(Bits|UuidInterface|AbstractUid|int|string $id, string $type
return $this->remember($state);
}

/** @param class-string<State> $type */
public function singleton(string $type): State
{
return $this->load(0, $type);
// FIXME: If the state we're loading has a last_event_id that's ahead of the registry's last_event_id, we need to re-build the state

if ($state = $this->states->get($type)) {
return $state;
}

$state = $this->snapshots->loadSingleton($type) ?? $type::make();
$state->id ??= Snowflake::make()->id();

$this->events
->read(state: $state, after_id: $state->last_event_id, up_to_id: $this->max_event_id, singleton: true)
->each(fn (Event $event) => $this->dispatcher->apply($event, $state));

// We'll store a reference to it by the type for future singleton access
$this->states->put($type, $state);

return $this->remember($state);
}

public function writeSnapshots(): bool
Expand Down
60 changes: 60 additions & 0 deletions tests/Feature/StatelessEventTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

use Thunk\Verbs\Event;
use Thunk\Verbs\Facades\Verbs;

it('can store and replay events that have no state', function () {

$GLOBALS['stateless_test_log'] = [];

StatelessEventOne::fire(label: 'First event');
StatelessEventTwo::fire(label: 'Second event');
StatelessEventOne::fire(label: 'Third event');
StatelessEventTwo::fire(label: 'Fourth event');

expect($GLOBALS['stateless_test_log'])->toBeEmpty();

Verbs::commit();

expect($GLOBALS['stateless_test_log'])->toBe([
'[1] First event',
'[2] Second event',
'[1] Third event',
'[2] Fourth event',
]);

$GLOBALS['stateless_test_log'] = [];

Verbs::replay();

expect($GLOBALS['stateless_test_log'])->toBe([
'[1] First event',
'[2] Second event',
'[1] Third event',
'[2] Fourth event',
]);
});

class StatelessEventOne extends Event
{
public function __construct(public string $label)
{
}

public function handle()
{
$GLOBALS['stateless_test_log'][] = "[1] {$this->label}";
}
}

class StatelessEventTwo extends Event
{
public function __construct(public string $label)
{
}

public function handle()
{
$GLOBALS['stateless_test_log'][] = "[2] {$this->label}";
}
}
26 changes: 0 additions & 26 deletions tests/Unit/StateManagerTest.php

This file was deleted.

0 comments on commit 3d79822

Please sign in to comment.