From 8c93b4f6e8058030f9565b6c93644907ade5893b Mon Sep 17 00:00:00 2001 From: marcus-sa <8391194+marcus-sa@users.noreply.github.com> Date: Thu, 29 Aug 2024 02:43:22 +0200 Subject: [PATCH] fix(common): reactive events bus message bus can't differentiate between messages without discriminators --- bun.lockb | Bin 432160 -> 432160 bytes common/src/lib/reactive-events-bus.spec.ts | 46 +++++++++++++++- common/src/lib/reactive-events-bus.ts | 59 ++++++++++++++++----- package.json | 2 +- tsconfig.base.json | 2 +- 5 files changed, 91 insertions(+), 18 deletions(-) diff --git a/bun.lockb b/bun.lockb index 2ecb365e567bd4d4293526e51a532e145d5103cd..b568c464b260319f7181d0b10393f3a61612bac8 100755 GIT binary patch delta 186 zcmV;r07d_xuo|GS8jvm^i)H!^mRLXMc{xzPN61a5eKV_EhKW&cRCY^e9GmM}u}%s{ z0Zfy@K_`+z2!n`6w}?jp9Y{brx-~pK_+egL#M+n93@XX!5|Q&YR@!#fXfnyLpnwS- zdvLa{qZW6l`d%LHy(-katWkFWnc5-?5Js$Ew3qXT<&Ob}<&Ofl<&OhSzX<`I0RR9H o004lOJWm9(0XDbrPXzJw1vV~pXL^@Egasdm1w{q71w{rMQ{FgE4gdfE delta 186 zcmZ3mL2AJUsR?=t(og3c-ef7tTyU(Z`l!@}+Jj8G$zQ$&pSD-XdNiT8W${KoE_X)n z$p; { Logger, { provide: RestateContextStorage, - useValue: () => ({ getStore: () => new RestateInMemoryContext() }), + useClass: RestateInMemoryContextStorage, }, provide(() => new BrokerBus(new BrokerMemoryAdapter())), provideReactiveEventsBus(), @@ -41,3 +45,41 @@ test('subscribe', async () => { expect(subscriber).toHaveBeenCalledTimes(1); expect(subscriber).toHaveBeenCalledWith(event); }); + +test('classes without discriminators', async () => { + class FirstEvent { + readonly id: UUID = uuid(); + } + + class SecondEvent { + readonly id: UUID = uuid(); + } + + type TestEvents = FirstEvent | SecondEvent; + + type TestEventsBus = ReactiveEventsBus; + + const injector = Injector.from([ + Logger, + { + provide: RestateContextStorage, + useClass: RestateInMemoryContextStorage, + }, + provide(() => new BrokerBus(new BrokerMemoryAdapter())), + provideReactiveEventsBus(), + ]); + + const events = injector.get(); + + const observable = await events.subscribe(); + + const subscriber = vi.fn(); + + observable.subscribe(subscriber); + + const event = new SecondEvent(); + + await events.publish(event); + + expect(subscriber).not.toHaveBeenCalled(); +}); diff --git a/common/src/lib/reactive-events-bus.ts b/common/src/lib/reactive-events-bus.ts index 54330fa..f8091c5 100644 --- a/common/src/lib/reactive-events-bus.ts +++ b/common/src/lib/reactive-events-bus.ts @@ -3,25 +3,47 @@ import { BrokerBus, BrokerBusChannel } from '@deepkit/broker'; import { provide } from '@deepkit/injector'; import { Logger, ScopedLogger } from '@deepkit/logger'; import { RestateContextStorage } from 'deepkit-restate'; +import { ClassType } from '@deepkit/core'; import { + assertType, getClassType, ReceiveType, + ReflectionKind, resolveReceiveType, TypeClass, + TypeUnion, } from '@deepkit/type'; export function provideReactiveEventsBus>( type?: ReceiveType, ) { - type = resolveReceiveType(type!) as TypeClass; - const eventsType = type.arguments![0]; + type = resolveReceiveType(type) as TypeClass; + const eventsType = type.arguments![0] as TypeUnion | TypeClass; + if ( + eventsType.kind !== ReflectionKind.class && + eventsType.kind !== ReflectionKind.union + ) { + throw new Error('Type argument must be a class or union'); + } + const eventTypes = + eventsType.kind === ReflectionKind.class ? [eventsType] : eventsType.types; + return provide( - (logger: Logger, contextStorage: RestateContextStorage, bus: BrokerBus) => - new ReactiveEventsBus( + (logger: Logger, contextStorage: RestateContextStorage, bus: BrokerBus) => { + const channels = new Map>(); + for (const eventType of eventTypes) { + assertType(eventType, ReflectionKind.class); + channels.set( + eventType.classType, + bus.channel(eventType.typeName!, eventType), + ); + } + return new ReactiveEventsBus( logger.scoped('ReactiveEventsBus'), contextStorage, - bus.channel(eventsType.typeName!, eventsType), - ), + channels, + ); + }, type, ); } @@ -30,28 +52,37 @@ export class ReactiveEventsBus { constructor( private readonly logger: ScopedLogger, private readonly contextStorage: RestateContextStorage, - private readonly channel: BrokerBusChannel, + private readonly channels: Map>, ) {} + #getChannel(classType: ClassType): BrokerBusChannel { + const channel = this.channels.get(classType); + if (!channel) { + throw new Error('Missing channel for class type: ' + classType.name); + } + return channel as BrokerBusChannel; + } + async publish(event: T): Promise { const ctx = this.contextStorage.getStore()!; await ctx.run(async () => { this.logger.log('Publishing event:', event); - await this.channel.publish(event); + const channel = this.#getChannel((event as any).constructor); + await channel.publish(event); }); } async subscribe( type?: ReceiveType, ): Promise> { - const classType = getClassType(resolveReceiveType(type!)); + const classType = getClassType(resolveReceiveType(type)); const events = new Subject(); - const unsubscribe = await this.channel.subscribe(event => { - if (event instanceof classType) { - this.logger.log('Received event:', event); - events.next(event as T); - } + const channel = this.#getChannel(classType); + + const unsubscribe = await channel.subscribe(event => { + this.logger.log('Received event:', event); + events.next(event as T); }); return new Observable(subscriber => { diff --git a/package.json b/package.json index e941171..f32b159 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "@deepkit/sql": "^1.0.1-alpha.153", "@deepkit/type": "^1.0.1-alpha.153", "@restatedev/restate-sdk": "1.2.1", - "deepkit-restate": "0.0.75", + "deepkit-restate": "0.0.76", "kafkajs": "2.2.4", "rxjs": "^7.8.1", "stripe": "^16.8.0", diff --git a/tsconfig.base.json b/tsconfig.base.json index 1132e7a..05c7470 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -26,12 +26,12 @@ "baseUrl": ".", "types": ["node"], "paths": { - "@ftgo/payment-service-api": ["payment-service-api/src/index.ts"], "@ftgo/common": ["common/src/index.ts"], "@ftgo/customer-service-api": ["customer-service-api/src/index.ts"], "@ftgo/delivery-service-api": ["delivery-service-api/src/index.ts"], "@ftgo/kitchen-service-api": ["kitchen-service-api/src/index.ts"], "@ftgo/order-service-api": ["order-service-api/src/index.ts"], + "@ftgo/payment-service-api": ["payment-service-api/src/index.ts"], "@ftgo/restaurant-service-api": ["restaurant-service-api/src/index.ts"] } },