Skip to content

Commit

Permalink
Refactored the event store definition to include the optimistic concu…
Browse files Browse the repository at this point in the history
…rrency checks

Adjusted typing to keep StreamVersion part of the Event Store definition, defaulting to bigint.

Added typings for input options and result to include cases where stream may not exists and user provides expected version (or not).

Made aggregateStream and readStream method take expectedStreamVersion to fail fast.
  • Loading branch information
oskardudycz committed Feb 14, 2024
1 parent 2e5ee32 commit 9273e9d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 71 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

"files.exclude": {
"node_modules/": true,
"**/node_modules/": true
//"dist/": true
"**/node_modules/": true,
"**/dist/": true
},
"files.eol": "\n",

Expand Down
6 changes: 3 additions & 3 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"tasks": [
{
"type": "npm",
"script": "build:ts:watch --ws",
"script": "build:ts:watch",
"group": "build",
"problemMatcher": [],
"label": "npm: build:ts:watch -ws",
"detail": "tsc --watch -ws"
"label": "npm: build:ts:watch",
"detail": "tsc --watch"
}
]
}
61 changes: 41 additions & 20 deletions packages/emmett/src/commandHandling/handleCommand.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,59 @@
import type { EventStore } from '../eventStore';
import {
NO_CHECK,
STREAM_DOES_NOT_EXISTS,
type DefaultStreamVersionType,
type EventStore,
type ExpectedStreamVersion,
} from '../eventStore';
import type { Event } from '../typing';

// #region command-handler
export const CommandHandler =
<State, StreamEvent extends Event, NextExpectedVersion = bigint>(
<State, StreamEvent extends Event, StreamVersion = DefaultStreamVersionType>(
evolve: (state: State, event: StreamEvent) => State,
getInitialState: () => State,
mapToStreamId: (id: string) => string,
mapToStreamId: (id: string) => string = (id) => id,
) =>
async (
eventStore: EventStore,
eventStore: EventStore<StreamVersion>,
id: string,
handle: (state: State) => StreamEvent | StreamEvent[],
options?: {
expectedStreamVersion?: ExpectedStreamVersion<StreamVersion>;
},
) => {
const streamName = mapToStreamId(id);

const { entity: state, nextExpectedVersion } =
await eventStore.aggregateStream<State, StreamEvent, NextExpectedVersion>(
streamName,
{
evolve,
getInitialState,
},
);
const { state, currentStreamVersion } = await eventStore.aggregateStream<
State,
StreamEvent
>(streamName, {
evolve,
getInitialState,
read: {
// expected stream version is passed to fail fast
// if stream is in the wrong state
expectedStreamVersion: options?.expectedStreamVersion ?? NO_CHECK,
},
});

const result = handle(state ?? getInitialState());

if (Array.isArray(result))
return eventStore.appendToStream(
streamName,
nextExpectedVersion,
...result,
);
else
return eventStore.appendToStream(streamName, nextExpectedVersion, result);
// Either use:
// - provided expected stream version,
// - current stream version got from stream aggregation,
// - or expect stream not to exists otherwise.
const expectedStreamVersion: ExpectedStreamVersion<StreamVersion> =
options?.expectedStreamVersion ??
currentStreamVersion ??
STREAM_DOES_NOT_EXISTS;

return eventStore.appendToStream(
streamName,
Array.isArray(result) ? result : [result],
{
expectedStreamVersion,
},
);
};
// #endregion command-handler
47 changes: 21 additions & 26 deletions packages/emmett/src/commandHandling/handleCommandWithDecider.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,38 @@
import type { EventStore } from '../eventStore';
import type {
DefaultStreamVersionType,
EventStore,
ExpectedStreamVersion,
} from '../eventStore';
import type { Command, Event } from '../typing';
import type { Decider } from '../typing/decider';
import { CommandHandler } from './handleCommand';

// #region command-handler
export const DeciderCommandHandler =
<
State,
CommandType extends Command,
StreamEvent extends Event,
NextExpectedVersion = bigint,
StreamVersion = DefaultStreamVersionType,
>(
{
decide,
evolve,
getInitialState,
}: Decider<State, CommandType, StreamEvent>,
mapToStreamId: (id: string) => string,
mapToStreamId: (id: string) => string = (id) => id,
) =>
async (eventStore: EventStore, id: string, command: CommandType) => {
const streamName = mapToStreamId(id);

const { entity: state, nextExpectedVersion } =
await eventStore.aggregateStream<State, StreamEvent, NextExpectedVersion>(
streamName,
{
evolve,
getInitialState,
},
);

const result = decide(command, state ?? getInitialState());

if (Array.isArray(result))
return eventStore.appendToStream(
streamName,
nextExpectedVersion,
...result,
);
else
return eventStore.appendToStream(streamName, nextExpectedVersion, result);
};
async (
eventStore: EventStore<StreamVersion>,
id: string,
command: CommandType,
options?: {
expectedStreamVersion?: ExpectedStreamVersion<StreamVersion>;
},
) =>
CommandHandler<State, StreamEvent, StreamVersion>(
evolve,
getInitialState,
mapToStreamId,
)(eventStore, id, (state) => decide(command, state), options);
// #endregion command-handler
99 changes: 79 additions & 20 deletions packages/emmett/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,87 @@
import type { Event } from '../typing';
import type { Event, Flavour } from '../typing';

// #region event-store
export interface EventStore {
aggregateStream<Entity, E extends Event, NextExpectedVersion = bigint>(
export interface EventStore<StreamVersion = DefaultStreamVersionType> {
aggregateStream<State, EventType extends Event>(
streamName: string,
options: {
evolve: (currentState: Entity, event: E) => Entity;
getInitialState: () => Entity;
startingVersion?: NextExpectedVersion | undefined;
},
): Promise<{
entity: Entity | null;
nextExpectedVersion: NextExpectedVersion;
}>;

readStream<E extends Event, NextExpectedVersion = bigint>(
options: AggregateStreamOptions<State, EventType, StreamVersion>,
): Promise<AggregateStreamResult<State, StreamVersion>>;

readStream<EventType extends Event>(
streamName: string,
startingVersion?: NextExpectedVersion | undefined,
): Promise<E[]>;
options?: ReadStreamOptions<StreamVersion>,
): Promise<ReadStreamResult<EventType, StreamVersion>>;

appendToStream<E extends Event, NextExpectedVersion = bigint>(
appendToStream<EventType extends Event>(
streamId: string,
expectedVersion?: NextExpectedVersion | undefined,
...events: E[]
): Promise<NextExpectedVersion>;
events: EventType[],
options?: AppendToStreamOptions<StreamVersion>,
): Promise<AppendToStreamResult<StreamVersion>>;
}

export type DefaultStreamVersionType = bigint;
// #endregion event-store

////////////////////////////////////////////////////////////////////
/// ReadStream types
////////////////////////////////////////////////////////////////////

export type ReadStreamOptions<StreamVersion = bigint> = {
from?: StreamVersion;
to?: StreamVersion;
expectedStreamVersion?: ExpectedStreamVersion<StreamVersion>;
};

export type ReadStreamResult<E extends Event, StreamVersion = bigint> = {
currentStreamVersion: StreamVersion;
events: E[];
} | null;

////////////////////////////////////////////////////////////////////
/// AggregateStream types
////////////////////////////////////////////////////////////////////

export type AggregateStreamOptions<
State,
E extends Event,
StreamVersion = bigint,
> = {
evolve: (currentState: State, event: E) => State;
getInitialState: () => State;
read?: ReadStreamOptions<StreamVersion>;
};

export type AggregateStreamResult<State, StreamVersion = bigint> = {
currentStreamVersion: StreamVersion | null;
state: State | null;
};

////////////////////////////////////////////////////////////////////
/// AppendToStream types
////////////////////////////////////////////////////////////////////

export type AppendToStreamOptions<StreamVersion = bigint> = {
expectedStreamVersion?: ExpectedStreamVersion<StreamVersion>;
};

export type AppendToStreamResult<StreamVersion = bigint> = {
nextExpectedStreamVersion: StreamVersion;
} | null;

export type ExpectedStreamVersion<VersionType = DefaultStreamVersionType> =
| ExpectedStreamVersionWithValue<VersionType>
| ExpectedStreamVersionGeneral;

export type ExpectedStreamVersionWithValue<
VersionType = DefaultStreamVersionType,
> = Flavour<VersionType, 'StreamVersion'>;

export type ExpectedStreamVersionGeneral = Flavour<
'STREAM_EXISTS' | 'STREAM_DOES_NOT_EXISTS' | 'NO_CHECK',
'StreamVersion'
>;

export const STREAM_EXISTS = 'STREAM_EXISTS' as ExpectedStreamVersionGeneral;
export const STREAM_DOES_NOT_EXISTS =
'STREAM_DOES_NOT_EXISTS' as ExpectedStreamVersionGeneral;
export const NO_CHECK = 'NO_CHECK' as ExpectedStreamVersionGeneral;

0 comments on commit 9273e9d

Please sign in to comment.