Skip to content

Commit

Permalink
based onaftercommit hook for mongodb event store
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-laycalvert committed Jan 9, 2025
1 parent e8e0b3c commit ff4ebaf
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
21 changes: 17 additions & 4 deletions src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {
assertExpectedVersionMatchesCurrent,
ExpectedVersionConflictError,
assertExpectedVersionMatchesCurrent,
filterProjections,
tryPublishMessagesAfterCommit,
type AggregateStreamOptions,
type AggregateStreamResult,
type AppendToStreamOptions,
Expand All @@ -14,6 +15,7 @@ import {
type ReadEventMetadataWithoutGlobalPosition,
type ReadStreamOptions,
type ReadStreamResult,
type DefaultEventStoreOptions,
} from '@event-driven-io/emmett';
import {
MongoClient,
Expand Down Expand Up @@ -178,7 +180,8 @@ export type MongoDBEventStoreOptions = {
MongoDBProjectionInlineHandlerContext
>[];
storage?: MongoDBEventStoreStorageOptions;
} & MongoDBEventStoreConnectionOptions;
} & MongoDBEventStoreConnectionOptions &
DefaultEventStoreOptions<MongoDBEventStore>;

export type MongoDBEventStore = EventStore<MongoDBReadEventMetadata> & {
projections: ProjectionQueries<StreamType>;
Expand All @@ -189,13 +192,15 @@ export type MongoDBEventStore = EventStore<MongoDBReadEventMetadata> & {

class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
private readonly client: MongoClient;
private shouldManageClientLifetime: boolean;
private readonly inlineProjections: MongoDBInlineProjectionDefinition[];
private shouldManageClientLifetime: boolean;
private isClosed: boolean = false;
public projections: ProjectionQueries<StreamType>;
private storage: MongoDBEventStoreStorage;
private options: MongoDBEventStoreOptions;
public projections: ProjectionQueries<StreamType>;

constructor(options: MongoDBEventStoreOptions) {
this.options = options;
this.client =
'client' in options && options.client
? options.client
Expand Down Expand Up @@ -388,6 +393,14 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
);
}

await tryPublishMessagesAfterCommit<MongoDBEventStore>(
eventsToAppend,
this.options.hooks,
// {
// TODO: same context as InlineProjectionHandlerContext for mongodb?
// },
);

return {
nextExpectedStreamVersion:
currentStreamVersion + BigInt(eventsToAppend.length),
Expand Down
7 changes: 6 additions & 1 deletion src/packages/emmett/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
GlobalPositionTypeOfReadEventMetadata,
ReadEvent,
ReadEventMetadata,
ReadEventMetadataWithoutGlobalPosition,
StreamPositionTypeOfReadEventMetadata,
} from '../typing';
import type { AfterEventStoreCommitHandler } from './afterCommit';
Expand Down Expand Up @@ -55,7 +56,11 @@ export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer ReadEventMetadataType>
? ReadEventMetadataType extends ReadEventMetadata<infer GV, infer SV>
? ReadEventMetadata<GV, SV> & ReadEventMetadataType
: never
: ReadEventMetadataType extends ReadEventMetadataWithoutGlobalPosition<
infer SV
>
? ReadEventMetadata<undefined, SV>
: never
: never;

export type GlobalPositionTypeOfEventStore<Store extends EventStore> =
Expand Down

0 comments on commit ff4ebaf

Please sign in to comment.