Skip to content

Commit

Permalink
fix(common): reactive events bus
Browse files Browse the repository at this point in the history
message bus can't differentiate between messages without discriminators
  • Loading branch information
marcus-sa committed Aug 29, 2024
1 parent de14100 commit 8c93b4f
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 18 deletions.
Binary file modified bun.lockb
Binary file not shown.
46 changes: 44 additions & 2 deletions common/src/lib/reactive-events-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { test, expect, vi } from 'vitest';
import { BrokerBus, BrokerMemoryAdapter } from '@deepkit/broker';
import { Injector, provide } from '@deepkit/injector';
import { uuid, UUID } from '@deepkit/type';
import { Logger } from '@deepkit/logger';
import { RestateContextStorage, RestateInMemoryContext } from 'deepkit-restate';
import {
RestateContextStorage,
RestateInMemoryContextStorage,
} from 'deepkit-restate';

import {
provideReactiveEventsBus,
Expand All @@ -20,7 +24,7 @@ test('subscribe', async () => {
Logger,
{
provide: RestateContextStorage,
useValue: () => ({ getStore: () => new RestateInMemoryContext() }),
useClass: RestateInMemoryContextStorage,
},
provide<BrokerBus>(() => new BrokerBus(new BrokerMemoryAdapter())),
provideReactiveEventsBus<TestEventsBus>(),
Expand All @@ -41,3 +45,41 @@ test('subscribe', async () => {
expect(subscriber).toHaveBeenCalledTimes(1);
expect(subscriber).toHaveBeenCalledWith(event);
});

test('classes without discriminators', async () => {
class FirstEvent {
readonly id: UUID = uuid();
}

class SecondEvent {
readonly id: UUID = uuid();
}

type TestEvents = FirstEvent | SecondEvent;

type TestEventsBus = ReactiveEventsBus<TestEvents>;

const injector = Injector.from([
Logger,
{
provide: RestateContextStorage,
useClass: RestateInMemoryContextStorage,
},
provide<BrokerBus>(() => new BrokerBus(new BrokerMemoryAdapter())),
provideReactiveEventsBus<TestEventsBus>(),
]);

const events = injector.get<TestEventsBus>();

const observable = await events.subscribe<FirstEvent>();

const subscriber = vi.fn();

observable.subscribe(subscriber);

const event = new SecondEvent();

await events.publish(event);

expect(subscriber).not.toHaveBeenCalled();
});
59 changes: 45 additions & 14 deletions common/src/lib/reactive-events-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,47 @@ import { BrokerBus, BrokerBusChannel } from '@deepkit/broker';
import { provide } from '@deepkit/injector';
import { Logger, ScopedLogger } from '@deepkit/logger';
import { RestateContextStorage } from 'deepkit-restate';
import { ClassType } from '@deepkit/core';
import {
assertType,
getClassType,
ReceiveType,
ReflectionKind,
resolveReceiveType,
TypeClass,
TypeUnion,
} from '@deepkit/type';

export function provideReactiveEventsBus<T extends ReactiveEventsBus<unknown>>(
type?: ReceiveType<T>,
) {
type = resolveReceiveType(type!) as TypeClass;
const eventsType = type.arguments![0];
type = resolveReceiveType(type) as TypeClass;
const eventsType = type.arguments![0] as TypeUnion | TypeClass;
if (
eventsType.kind !== ReflectionKind.class &&
eventsType.kind !== ReflectionKind.union
) {
throw new Error('Type argument must be a class or union');
}
const eventTypes =
eventsType.kind === ReflectionKind.class ? [eventsType] : eventsType.types;

return provide(
(logger: Logger, contextStorage: RestateContextStorage, bus: BrokerBus) =>
new ReactiveEventsBus(
(logger: Logger, contextStorage: RestateContextStorage, bus: BrokerBus) => {
const channels = new Map<ClassType, BrokerBusChannel<unknown>>();
for (const eventType of eventTypes) {
assertType(eventType, ReflectionKind.class);
channels.set(
eventType.classType,
bus.channel(eventType.typeName!, eventType),
);
}
return new ReactiveEventsBus(
logger.scoped('ReactiveEventsBus'),
contextStorage,
bus.channel(eventsType.typeName!, eventsType),
),
channels,
);
},
type,
);
}
Expand All @@ -30,28 +52,37 @@ export class ReactiveEventsBus<Events> {
constructor(
private readonly logger: ScopedLogger,
private readonly contextStorage: RestateContextStorage,
private readonly channel: BrokerBusChannel<Events>,
private readonly channels: Map<ClassType, BrokerBusChannel<Events>>,
) {}

#getChannel<T extends Events>(classType: ClassType): BrokerBusChannel<T> {
const channel = this.channels.get(classType);
if (!channel) {
throw new Error('Missing channel for class type: ' + classType.name);
}
return channel as BrokerBusChannel<T>;
}

async publish<T extends Events>(event: T): Promise<void> {
const ctx = this.contextStorage.getStore()!;
await ctx.run(async () => {
this.logger.log('Publishing event:', event);
await this.channel.publish(event);
const channel = this.#getChannel<T>((event as any).constructor);
await channel.publish(event);
});
}

async subscribe<T extends Events>(
type?: ReceiveType<T>,
): Promise<Observable<T>> {
const classType = getClassType(resolveReceiveType(type!));
const classType = getClassType(resolveReceiveType(type));
const events = new Subject<T>();

const unsubscribe = await this.channel.subscribe(event => {
if (event instanceof classType) {
this.logger.log('Received event:', event);
events.next(event as T);
}
const channel = this.#getChannel<T>(classType);

const unsubscribe = await channel.subscribe(event => {
this.logger.log('Received event:', event);
events.next(event as T);
});

return new Observable<T>(subscriber => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@deepkit/sql": "^1.0.1-alpha.153",
"@deepkit/type": "^1.0.1-alpha.153",
"@restatedev/restate-sdk": "1.2.1",
"deepkit-restate": "0.0.75",
"deepkit-restate": "0.0.76",
"kafkajs": "2.2.4",
"rxjs": "^7.8.1",
"stripe": "^16.8.0",
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
"baseUrl": ".",
"types": ["node"],
"paths": {
"@ftgo/payment-service-api": ["payment-service-api/src/index.ts"],
"@ftgo/common": ["common/src/index.ts"],
"@ftgo/customer-service-api": ["customer-service-api/src/index.ts"],
"@ftgo/delivery-service-api": ["delivery-service-api/src/index.ts"],
"@ftgo/kitchen-service-api": ["kitchen-service-api/src/index.ts"],
"@ftgo/order-service-api": ["order-service-api/src/index.ts"],
"@ftgo/payment-service-api": ["payment-service-api/src/index.ts"],
"@ftgo/restaurant-service-api": ["restaurant-service-api/src/index.ts"]
}
},
Expand Down

0 comments on commit 8c93b4f

Please sign in to comment.