From 2e5ee323643af731fd49d8e075f1ec2a2b02b396 Mon Sep 17 00:00:00 2001 From: Christopher Watford Date: Tue, 13 Feb 2024 16:23:29 -0500 Subject: [PATCH] Add expected version to EventStore interface #10 --- .../src/commandHandling/handleCommand.ts | 23 ++++++++++----- .../handleCommandWithDecider.ts | 28 ++++++++++++++----- packages/emmett/src/eventStore/eventStore.ts | 14 ++++++++-- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/packages/emmett/src/commandHandling/handleCommand.ts b/packages/emmett/src/commandHandling/handleCommand.ts index 7187005a..e004f5b8 100644 --- a/packages/emmett/src/commandHandling/handleCommand.ts +++ b/packages/emmett/src/commandHandling/handleCommand.ts @@ -3,7 +3,7 @@ import type { Event } from '../typing'; // #region command-handler export const CommandHandler = - ( + ( evolve: (state: State, event: StreamEvent) => State, getInitialState: () => State, mapToStreamId: (id: string) => string, @@ -15,15 +15,24 @@ export const CommandHandler = ) => { const streamName = mapToStreamId(id); - const state = await eventStore.aggregateStream(streamName, { - evolve, - getInitialState, - }); + const { entity: state, nextExpectedVersion } = + await eventStore.aggregateStream( + streamName, + { + evolve, + getInitialState, + }, + ); const result = handle(state ?? getInitialState()); if (Array.isArray(result)) - return eventStore.appendToStream(streamName, ...result); - else return eventStore.appendToStream(streamName, result); + return eventStore.appendToStream( + streamName, + nextExpectedVersion, + ...result, + ); + else + return eventStore.appendToStream(streamName, nextExpectedVersion, result); }; // #endregion command-handler diff --git a/packages/emmett/src/commandHandling/handleCommandWithDecider.ts b/packages/emmett/src/commandHandling/handleCommandWithDecider.ts index a9cb394b..4d9ea78a 100644 --- a/packages/emmett/src/commandHandling/handleCommandWithDecider.ts +++ b/packages/emmett/src/commandHandling/handleCommandWithDecider.ts @@ -4,7 +4,12 @@ import type { Decider } from '../typing/decider'; // #region command-handler export const DeciderCommandHandler = - ( + < + State, + CommandType extends Command, + StreamEvent extends Event, + NextExpectedVersion = bigint, + >( { decide, evolve, @@ -15,15 +20,24 @@ export const DeciderCommandHandler = async (eventStore: EventStore, id: string, command: CommandType) => { const streamName = mapToStreamId(id); - const state = await eventStore.aggregateStream(streamName, { - evolve, - getInitialState, - }); + const { entity: state, nextExpectedVersion } = + await eventStore.aggregateStream( + streamName, + { + evolve, + getInitialState, + }, + ); const result = decide(command, state ?? getInitialState()); if (Array.isArray(result)) - return eventStore.appendToStream(streamName, ...result); - else return eventStore.appendToStream(streamName, result); + return eventStore.appendToStream( + streamName, + nextExpectedVersion, + ...result, + ); + else + return eventStore.appendToStream(streamName, nextExpectedVersion, result); }; // #endregion command-handler diff --git a/packages/emmett/src/eventStore/eventStore.ts b/packages/emmett/src/eventStore/eventStore.ts index 2afab070..779fca03 100644 --- a/packages/emmett/src/eventStore/eventStore.ts +++ b/packages/emmett/src/eventStore/eventStore.ts @@ -2,18 +2,26 @@ import type { Event } from '../typing'; // #region event-store export interface EventStore { - aggregateStream( + aggregateStream( streamName: string, options: { evolve: (currentState: Entity, event: E) => Entity; getInitialState: () => Entity; + startingVersion?: NextExpectedVersion | undefined; }, - ): Promise; + ): Promise<{ + entity: Entity | null; + nextExpectedVersion: NextExpectedVersion; + }>; - readStream(streamName: string): Promise; + readStream( + streamName: string, + startingVersion?: NextExpectedVersion | undefined, + ): Promise; appendToStream( streamId: string, + expectedVersion?: NextExpectedVersion | undefined, ...events: E[] ): Promise; }