Skip to content

Commit

Permalink
reverted typing, added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-laycalvert committed Jan 9, 2025
1 parent ff4ebaf commit 5d89d83
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
}

await tryPublishMessagesAfterCommit<MongoDBEventStore>(
// @ts-expect-error Issues with `globalPosition` not being present causing the type for metadata to expect `never`
eventsToAppend,
this.options.hooks,
// {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { after, before, describe, it } from 'node:test';
import { v7 as uuid } from 'uuid';
import { type Event, assertEqual } from '@event-driven-io/emmett';
import {
getMongoDBEventStore,
type MongoDBReadEvent,
} from './mongoDBEventStore';
import {
MongoDBContainer,
StartedMongoDBContainer,
} from '@testcontainers/mongodb';
import { MongoClient } from 'mongodb';

type TestEvent = Event<'test', { counter: number }, { some: boolean }>;

void describe('MongoDBEventStore onAfterCommit', () => {
let mongodb: StartedMongoDBContainer;
let client: MongoClient;

before(async () => {
mongodb = await new MongoDBContainer().start();
client = new MongoClient(mongodb.getConnectionString(), {
directConnection: true,
});

await client.connect();
});

after(async () => {
try {
await client.close();
await mongodb.stop();
} catch (error) {
console.log(error);
}
});

void it('calls onAfterCommit hook after events append', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
},
},
});
const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);

// Then
assertEqual(2, appendedEvents.length);
});

void it('calls onAfterCommit hook exactly once for each events append', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
},
},
});
const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];
const nextEvents: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);
await eventStore.appendToStream(streamName, nextEvents);

// Then
assertEqual(4, appendedEvents.length);
});

void it('silently fails when onAfterCommit hook failed but still keeps events', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
throw new Error('onAfterCommit failed!');
},
},
});

const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);

// Then
assertEqual(2, appendedEvents.length);
const { events: eventsInStore } = await eventStore.readStream(streamName);
assertEqual(2, eventsInStore.length);
});
});
7 changes: 1 addition & 6 deletions src/packages/emmett/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import type {
GlobalPositionTypeOfReadEventMetadata,
ReadEvent,
ReadEventMetadata,
ReadEventMetadataWithoutGlobalPosition,
StreamPositionTypeOfReadEventMetadata,
} from '../typing';
import type { AfterEventStoreCommitHandler } from './afterCommit';
Expand Down Expand Up @@ -56,11 +55,7 @@ export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer ReadEventMetadataType>
? ReadEventMetadataType extends ReadEventMetadata<infer GV, infer SV>
? ReadEventMetadata<GV, SV> & ReadEventMetadataType
: ReadEventMetadataType extends ReadEventMetadataWithoutGlobalPosition<
infer SV
>
? ReadEventMetadata<undefined, SV>
: never
: never
: never;

export type GlobalPositionTypeOfEventStore<Store extends EventStore> =
Expand Down

0 comments on commit 5d89d83

Please sign in to comment.