Skip to content

Commit

Permalink
chore(rpc): add custom binary message communication example
Browse files Browse the repository at this point in the history
  • Loading branch information
marcj committed May 10, 2024
1 parent 697e894 commit d89e3d3
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
12 changes: 12 additions & 0 deletions packages/rpc/src/client/message-subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export class RpcMessageSubject {
return this;
}

/**
* Waits for the Ack message from the server, then closes the subject.
*/
async ackThenClose(): Promise<undefined> {
return asyncOperation<undefined>((resolve, reject) => {
this.onReply((next) => {
Expand All @@ -80,6 +83,9 @@ export class RpcMessageSubject {
});
}

/**
* Wait for next message to arrive.
*/
async waitNextMessage<T>(): Promise<RpcMessage> {
return asyncOperation<any>((resolve, reject) => {
this.onReply((next) => {
Expand All @@ -89,6 +95,9 @@ export class RpcMessageSubject {
});
}

/**
* Wait for next message with body parse.
*/
async waitNext<T>(type: number, schema?: ReceiveType<T>): Promise<T> {
return asyncOperation<any>((resolve, reject) => {
this.onReply((next) => {
Expand All @@ -107,6 +116,9 @@ export class RpcMessageSubject {
});
}

/**
* Waits for the first message of a specific type, then closes the subject.
*/
async firstThenClose<T = RpcMessage>(type: number, schema?: ReceiveType<T>): Promise<T> {
return await asyncOperation<any>((resolve, reject) => {
this.onReply((next) => {
Expand Down
79 changes: 79 additions & 0 deletions packages/rpc/tests/custom-message.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { expect, test } from '@jest/globals';
import { RpcKernel, RpcKernelConnection } from '../src/server/kernel.js';
import { createRpcMessage, RpcMessage } from '../src/protocol.js';
import { DirectClient } from '../src/client/client-direct.js';
import { sleep } from '@deepkit/core';

test('back controller', async () => {
/**
* @see RpcTypes
*/
enum MyTypes {
//the first 100 are reserved in @deepkit/rpc RpcTypes
Ack,
Error,

// Our custom types:
QueryAndAnswer = 100,
Answer = 101,

BroadcastWithAck = 102,
Broadcast = 103,
}

let broadcastCalled: any = undefined;
let broadcastWithAckCalled: any = undefined;

class MyRpcKernelConnection extends RpcKernelConnection {
async onMessage(message: RpcMessage): Promise<void> {
if (message.type === MyTypes.QueryAndAnswer) {
this.writer.write(createRpcMessage<{ v: string }>(message.id, MyTypes.Answer, { v: '42 is the answer' }));
return;
}

if (message.type === MyTypes.BroadcastWithAck) {
broadcastWithAckCalled = message.parseBody<{v: string}>()
this.writer.write(createRpcMessage(message.id, MyTypes.Ack));
return;
}

if (message.type === MyTypes.Broadcast) {
// no ack wanted
broadcastCalled = message.parseBody<{v: string}>()
return;
}

// Handle all the other messages with the default behavior
return super.onMessage(message);
}
}

class MyKernel extends RpcKernel {
RpcKernelConnection = MyRpcKernelConnection;
}

const kernel = new MyKernel();

const client = new DirectClient(kernel);

// This wait for the server's Ack
const answer = await client
.sendMessage(MyTypes.QueryAndAnswer)
.firstThenClose<{ v: string }>(MyTypes.Answer);
expect(answer.v).toBe('42 is the answer');

// This wait for the server's Ack
await client
.sendMessage<{v: string}>(MyTypes.BroadcastWithAck, {v: 'Hi1'})
.ackThenClose();
expect(broadcastWithAckCalled).toEqual({v: 'Hi1'});

// This does not wait for the server's Ack. release() call necessary to
// release the message context.
client
.sendMessage<{v: string}>(MyTypes.Broadcast, {v: 'Hi2'})
.release();

await sleep(0.1);
expect(broadcastCalled).toEqual({v: 'Hi2'});
});

0 comments on commit d89e3d3

Please sign in to comment.