Skip to content

Commit

Permalink
Add expected version to EventStore interface #10
Browse files Browse the repository at this point in the history
  • Loading branch information
watfordsuzy authored and oskardudycz committed Feb 14, 2024
1 parent 9650af6 commit 2e5ee32
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
23 changes: 16 additions & 7 deletions packages/emmett/src/commandHandling/handleCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Event } from '../typing';

// #region command-handler
export const CommandHandler =
<State, StreamEvent extends Event>(
<State, StreamEvent extends Event, NextExpectedVersion = bigint>(
evolve: (state: State, event: StreamEvent) => State,
getInitialState: () => State,
mapToStreamId: (id: string) => string,
Expand All @@ -15,15 +15,24 @@ export const CommandHandler =
) => {
const streamName = mapToStreamId(id);

const state = await eventStore.aggregateStream(streamName, {
evolve,
getInitialState,
});
const { entity: state, nextExpectedVersion } =
await eventStore.aggregateStream<State, StreamEvent, NextExpectedVersion>(
streamName,
{
evolve,
getInitialState,
},
);

const result = handle(state ?? getInitialState());

if (Array.isArray(result))
return eventStore.appendToStream(streamName, ...result);
else return eventStore.appendToStream(streamName, result);
return eventStore.appendToStream(
streamName,
nextExpectedVersion,
...result,
);
else
return eventStore.appendToStream(streamName, nextExpectedVersion, result);
};
// #endregion command-handler
28 changes: 21 additions & 7 deletions packages/emmett/src/commandHandling/handleCommandWithDecider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import type { Decider } from '../typing/decider';

// #region command-handler
export const DeciderCommandHandler =
<State, CommandType extends Command, StreamEvent extends Event>(
<
State,
CommandType extends Command,
StreamEvent extends Event,
NextExpectedVersion = bigint,
>(
{
decide,
evolve,
Expand All @@ -15,15 +20,24 @@ export const DeciderCommandHandler =
async (eventStore: EventStore, id: string, command: CommandType) => {
const streamName = mapToStreamId(id);

const state = await eventStore.aggregateStream(streamName, {
evolve,
getInitialState,
});
const { entity: state, nextExpectedVersion } =
await eventStore.aggregateStream<State, StreamEvent, NextExpectedVersion>(
streamName,
{
evolve,
getInitialState,
},
);

const result = decide(command, state ?? getInitialState());

if (Array.isArray(result))
return eventStore.appendToStream(streamName, ...result);
else return eventStore.appendToStream(streamName, result);
return eventStore.appendToStream(
streamName,
nextExpectedVersion,
...result,
);
else
return eventStore.appendToStream(streamName, nextExpectedVersion, result);
};
// #endregion command-handler
14 changes: 11 additions & 3 deletions packages/emmett/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ import type { Event } from '../typing';

// #region event-store
export interface EventStore {
aggregateStream<Entity, E extends Event>(
aggregateStream<Entity, E extends Event, NextExpectedVersion = bigint>(
streamName: string,
options: {
evolve: (currentState: Entity, event: E) => Entity;
getInitialState: () => Entity;
startingVersion?: NextExpectedVersion | undefined;
},
): Promise<Entity | null>;
): Promise<{
entity: Entity | null;
nextExpectedVersion: NextExpectedVersion;
}>;

readStream<E extends Event>(streamName: string): Promise<E[]>;
readStream<E extends Event, NextExpectedVersion = bigint>(
streamName: string,
startingVersion?: NextExpectedVersion | undefined,
): Promise<E[]>;

appendToStream<E extends Event, NextExpectedVersion = bigint>(
streamId: string,
expectedVersion?: NextExpectedVersion | undefined,
...events: E[]
): Promise<NextExpectedVersion>;
}
Expand Down

0 comments on commit 2e5ee32

Please sign in to comment.