From 9273e9d108c29fc34ad21dc30ed1e8ff40e2e7d8 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 14 Feb 2024 11:21:22 +0100 Subject: [PATCH] Refactored the event store definition to include the optimistic concurrency checks Adjusted typing to keep StreamVersion part of the Event Store definition, defaulting to bigint. Added typings for input options and result to include cases where stream may not exists and user provides expected version (or not). Made aggregateStream and readStream method take expectedStreamVersion to fail fast. --- .vscode/settings.json | 4 +- .vscode/tasks.json | 6 +- .../src/commandHandling/handleCommand.ts | 61 ++++++++---- .../handleCommandWithDecider.ts | 47 ++++----- packages/emmett/src/eventStore/eventStore.ts | 99 +++++++++++++++---- 5 files changed, 146 insertions(+), 71 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 9e23f29a..dd873e77 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,8 +13,8 @@ "files.exclude": { "node_modules/": true, - "**/node_modules/": true - //"dist/": true + "**/node_modules/": true, + "**/dist/": true }, "files.eol": "\n", diff --git a/.vscode/tasks.json b/.vscode/tasks.json index e137beb3..bbfd1b28 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -3,11 +3,11 @@ "tasks": [ { "type": "npm", - "script": "build:ts:watch --ws", + "script": "build:ts:watch", "group": "build", "problemMatcher": [], - "label": "npm: build:ts:watch -ws", - "detail": "tsc --watch -ws" + "label": "npm: build:ts:watch", + "detail": "tsc --watch" } ] } diff --git a/packages/emmett/src/commandHandling/handleCommand.ts b/packages/emmett/src/commandHandling/handleCommand.ts index e004f5b8..ea0b96a9 100644 --- a/packages/emmett/src/commandHandling/handleCommand.ts +++ b/packages/emmett/src/commandHandling/handleCommand.ts @@ -1,38 +1,59 @@ -import type { EventStore } from '../eventStore'; +import { + NO_CHECK, + STREAM_DOES_NOT_EXISTS, + type DefaultStreamVersionType, + type EventStore, + type ExpectedStreamVersion, +} from '../eventStore'; import type { Event } from '../typing'; // #region command-handler export const CommandHandler = - ( + ( evolve: (state: State, event: StreamEvent) => State, getInitialState: () => State, - mapToStreamId: (id: string) => string, + mapToStreamId: (id: string) => string = (id) => id, ) => async ( - eventStore: EventStore, + eventStore: EventStore, id: string, handle: (state: State) => StreamEvent | StreamEvent[], + options?: { + expectedStreamVersion?: ExpectedStreamVersion; + }, ) => { const streamName = mapToStreamId(id); - const { entity: state, nextExpectedVersion } = - await eventStore.aggregateStream( - streamName, - { - evolve, - getInitialState, - }, - ); + const { state, currentStreamVersion } = await eventStore.aggregateStream< + State, + StreamEvent + >(streamName, { + evolve, + getInitialState, + read: { + // expected stream version is passed to fail fast + // if stream is in the wrong state + expectedStreamVersion: options?.expectedStreamVersion ?? NO_CHECK, + }, + }); const result = handle(state ?? getInitialState()); - if (Array.isArray(result)) - return eventStore.appendToStream( - streamName, - nextExpectedVersion, - ...result, - ); - else - return eventStore.appendToStream(streamName, nextExpectedVersion, result); + // Either use: + // - provided expected stream version, + // - current stream version got from stream aggregation, + // - or expect stream not to exists otherwise. + const expectedStreamVersion: ExpectedStreamVersion = + options?.expectedStreamVersion ?? + currentStreamVersion ?? + STREAM_DOES_NOT_EXISTS; + + return eventStore.appendToStream( + streamName, + Array.isArray(result) ? result : [result], + { + expectedStreamVersion, + }, + ); }; // #endregion command-handler diff --git a/packages/emmett/src/commandHandling/handleCommandWithDecider.ts b/packages/emmett/src/commandHandling/handleCommandWithDecider.ts index 4d9ea78a..e0589e49 100644 --- a/packages/emmett/src/commandHandling/handleCommandWithDecider.ts +++ b/packages/emmett/src/commandHandling/handleCommandWithDecider.ts @@ -1,6 +1,11 @@ -import type { EventStore } from '../eventStore'; +import type { + DefaultStreamVersionType, + EventStore, + ExpectedStreamVersion, +} from '../eventStore'; import type { Command, Event } from '../typing'; import type { Decider } from '../typing/decider'; +import { CommandHandler } from './handleCommand'; // #region command-handler export const DeciderCommandHandler = @@ -8,36 +13,26 @@ export const DeciderCommandHandler = State, CommandType extends Command, StreamEvent extends Event, - NextExpectedVersion = bigint, + StreamVersion = DefaultStreamVersionType, >( { decide, evolve, getInitialState, }: Decider, - mapToStreamId: (id: string) => string, + mapToStreamId: (id: string) => string = (id) => id, ) => - async (eventStore: EventStore, id: string, command: CommandType) => { - const streamName = mapToStreamId(id); - - const { entity: state, nextExpectedVersion } = - await eventStore.aggregateStream( - streamName, - { - evolve, - getInitialState, - }, - ); - - const result = decide(command, state ?? getInitialState()); - - if (Array.isArray(result)) - return eventStore.appendToStream( - streamName, - nextExpectedVersion, - ...result, - ); - else - return eventStore.appendToStream(streamName, nextExpectedVersion, result); - }; + async ( + eventStore: EventStore, + id: string, + command: CommandType, + options?: { + expectedStreamVersion?: ExpectedStreamVersion; + }, + ) => + CommandHandler( + evolve, + getInitialState, + mapToStreamId, + )(eventStore, id, (state) => decide(command, state), options); // #endregion command-handler diff --git a/packages/emmett/src/eventStore/eventStore.ts b/packages/emmett/src/eventStore/eventStore.ts index 779fca03..c8d2528d 100644 --- a/packages/emmett/src/eventStore/eventStore.ts +++ b/packages/emmett/src/eventStore/eventStore.ts @@ -1,28 +1,87 @@ -import type { Event } from '../typing'; +import type { Event, Flavour } from '../typing'; // #region event-store -export interface EventStore { - aggregateStream( +export interface EventStore { + aggregateStream( streamName: string, - options: { - evolve: (currentState: Entity, event: E) => Entity; - getInitialState: () => Entity; - startingVersion?: NextExpectedVersion | undefined; - }, - ): Promise<{ - entity: Entity | null; - nextExpectedVersion: NextExpectedVersion; - }>; - - readStream( + options: AggregateStreamOptions, + ): Promise>; + + readStream( streamName: string, - startingVersion?: NextExpectedVersion | undefined, - ): Promise; + options?: ReadStreamOptions, + ): Promise>; - appendToStream( + appendToStream( streamId: string, - expectedVersion?: NextExpectedVersion | undefined, - ...events: E[] - ): Promise; + events: EventType[], + options?: AppendToStreamOptions, + ): Promise>; } + +export type DefaultStreamVersionType = bigint; // #endregion event-store + +//////////////////////////////////////////////////////////////////// +/// ReadStream types +//////////////////////////////////////////////////////////////////// + +export type ReadStreamOptions = { + from?: StreamVersion; + to?: StreamVersion; + expectedStreamVersion?: ExpectedStreamVersion; +}; + +export type ReadStreamResult = { + currentStreamVersion: StreamVersion; + events: E[]; +} | null; + +//////////////////////////////////////////////////////////////////// +/// AggregateStream types +//////////////////////////////////////////////////////////////////// + +export type AggregateStreamOptions< + State, + E extends Event, + StreamVersion = bigint, +> = { + evolve: (currentState: State, event: E) => State; + getInitialState: () => State; + read?: ReadStreamOptions; +}; + +export type AggregateStreamResult = { + currentStreamVersion: StreamVersion | null; + state: State | null; +}; + +//////////////////////////////////////////////////////////////////// +/// AppendToStream types +//////////////////////////////////////////////////////////////////// + +export type AppendToStreamOptions = { + expectedStreamVersion?: ExpectedStreamVersion; +}; + +export type AppendToStreamResult = { + nextExpectedStreamVersion: StreamVersion; +} | null; + +export type ExpectedStreamVersion = + | ExpectedStreamVersionWithValue + | ExpectedStreamVersionGeneral; + +export type ExpectedStreamVersionWithValue< + VersionType = DefaultStreamVersionType, +> = Flavour; + +export type ExpectedStreamVersionGeneral = Flavour< + 'STREAM_EXISTS' | 'STREAM_DOES_NOT_EXISTS' | 'NO_CHECK', + 'StreamVersion' +>; + +export const STREAM_EXISTS = 'STREAM_EXISTS' as ExpectedStreamVersionGeneral; +export const STREAM_DOES_NOT_EXISTS = + 'STREAM_DOES_NOT_EXISTS' as ExpectedStreamVersionGeneral; +export const NO_CHECK = 'NO_CHECK' as ExpectedStreamVersionGeneral;