Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC updates #1303

Merged
merged 10 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silent-garlics-add.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

RPC updates
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,37 +316,37 @@ The participant who implements the method and will receive its calls must first

```typescript
room.localParticipant?.registerRpcMethod(
// method name - can be any string that makes sense for your application
// method name - can be any string that makes sense for your application
'greet',

// method handler - will be called when the method is invoked by a RemoteParticipant
async (requestId: string, callerIdentity: string, payload: string, responseTimeoutMs: number) => {
console.log(`Received greeting from ${callerIdentity}: ${payload}`);
return `Hello, ${callerIdentity}!`;
async (data: RpcInvocationData) => {
console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
return `Hello, ${data.callerIdentity}!`;
}
);
```

In addition to the payload, your handler will also receive `responseTimeoutMs`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.
In addition to the payload, your handler will also receive `responseTimeout`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.

#### Performing an RPC request

The caller may then initiate an RPC call like so:

```typescript
try {
const response = await room.localParticipant!.performRpc(
'recipient-identity',
'greet',
'Hello from RPC!'
);
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'recipient-identity',
method: 'greet',
payload: 'Hello from RPC!',
});
console.log('RPC response:', response);
} catch (error) {
console.error('RPC call failed:', error);
}
```

You may find it useful to adjust the `responseTimeoutMs` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.
You may find it useful to adjust the `responseTimeout` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.

#### Errors

Expand Down
87 changes: 41 additions & 46 deletions examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { Room, type RoomConnectOptions, RoomEvent, RpcError } from '../../src/index';
import {
Room,
type RoomConnectOptions,
RoomEvent,
RpcError,
RpcInvocationData,
} from '../../src/index';

let startTime: number;

Expand Down Expand Up @@ -72,30 +78,21 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
await greetersRoom.localParticipant?.registerRpcMethod(
'arrival',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeoutMs: number,
) => {
console.log(`[Greeter] Oh ${callerIdentity} arrived and said "${payload}"`);
async (data: RpcInvocationData) => {
console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`);
await new Promise((resolve) => setTimeout(resolve, 2000));
return 'Welcome and have a wonderful day!';
},
);

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'square-root',
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeoutMs: number,
) => {
const jsonData = JSON.parse(payload);
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;

console.log(
`[Math Genius] I guess ${callerIdentity} wants the square root of ${number}. I've only got ${responseTimeoutMs / 1000} seconds to respond but I think I can pull it off.`,
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
);

console.log(`[Math Genius] *doing math*…`);
Expand All @@ -109,18 +106,12 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'divide',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeoutMs: number,
) => {
const jsonData = JSON.parse(payload);
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const { numerator, denominator } = jsonData;

console.log(
`[Math Genius] ${callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
);

await new Promise((resolve) => setTimeout(resolve, 2000));
Expand All @@ -139,7 +130,11 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
const performGreeting = async (room: Room): Promise<void> => {
console.log("[Caller] Letting the greeter know that I've arrived");
try {
const response = await room.localParticipant!.performRpc('greeter', 'arrival', 'Hello');
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'greeter',
method: 'arrival',
payload: 'Hello',
});
console.log(`[Caller] That's nice, the greeter said: "${response}"`);
} catch (error) {
console.error('[Caller] RPC call failed:', error);
Expand All @@ -150,11 +145,11 @@ const performGreeting = async (room: Room): Promise<void> => {
const performDisconnection = async (room: Room): Promise<void> => {
console.log('[Caller] Checking back in on the greeter...');
try {
const response = await room.localParticipant!.performRpc(
'greeter',
'arrival',
'You still there?',
);
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'greeter',
method: 'arrival',
payload: 'You still there?',
});
console.log(`[Caller] That's nice, the greeter said: "${response}"`);
} catch (error) {
if (error instanceof RpcError && error.code === RpcError.ErrorCode.RECIPIENT_DISCONNECTED) {
Expand All @@ -169,11 +164,11 @@ const performDisconnection = async (room: Room): Promise<void> => {
const performSquareRoot = async (room: Room): Promise<void> => {
console.log("[Caller] What's the square root of 16?");
try {
const response = await room.localParticipant!.performRpc(
'math-genius',
'square-root',
JSON.stringify({ number: 16 }),
);
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'math-genius',
method: 'square-root',
payload: JSON.stringify({ number: 16 }),
});
const parsedResponse = JSON.parse(response);
console.log(`[Caller] Nice, the answer was ${parsedResponse.result}`);
} catch (error) {
Expand All @@ -185,11 +180,11 @@ const performSquareRoot = async (room: Room): Promise<void> => {
const performQuantumHypergeometricSeries = async (room: Room): Promise<void> => {
console.log("[Caller] What's the quantum hypergeometric series of 42?");
try {
const response = await room.localParticipant!.performRpc(
'math-genius',
'quantum-hypergeometric-series',
JSON.stringify({ number: 42 }),
);
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'math-genius',
method: 'quantum-hypergeometric-series',
payload: JSON.stringify({ number: 42 }),
});
const parsedResponse = JSON.parse(response);
console.log(`[Caller] genius says ${parsedResponse.result}!`);
} catch (error) {
Expand All @@ -208,11 +203,11 @@ const performQuantumHypergeometricSeries = async (room: Room): Promise<void> =>
const performDivision = async (room: Room): Promise<void> => {
console.log("[Caller] Let's try dividing 10 by 0");
try {
const response = await room.localParticipant!.performRpc(
'math-genius',
'divide',
JSON.stringify({ numerator: 10, denominator: 0 }),
);
const response = await room.localParticipant!.performRpc({
destinationIdentity: 'math-genius',
method: 'divide',
payload: JSON.stringify({ numerator: 10, denominator: 0 }),
});
const parsedResponse = JSON.parse(response);
console.log(`[Caller] The result is ${parsedResponse.result}`);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
} from './room/utils';
import { getBrowser } from './utils/browserParser';

export { RpcError } from './room/rpc';
export { RpcError, type RpcInvocationData, type PerformRpcParams } from './room/rpc';

export * from './connectionHelper/ConnectionCheck';
export * from './connectionHelper/checks/Checker';
Expand Down
73 changes: 40 additions & 33 deletions src/room/participant/LocalParticipant.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ describe('LocalParticipant', () => {
methodName,
'test payload',
5000,
1,
);

expect(handler).toHaveBeenCalledWith(
'test-request-id',
mockCaller.identity,
'test payload',
5000,
);
expect(handler).toHaveBeenCalledWith({
requestId: 'test-request-id',
callerIdentity: mockCaller.identity,
payload: 'test payload',
responseTimeout: 5000,
});

// Check if sendDataPacket was called twice (once for ACK and once for response)
expect(mockSendDataPacket).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -100,14 +101,15 @@ describe('LocalParticipant', () => {
methodName,
'test payload',
5000,
1,
);

expect(handler).toHaveBeenCalledWith(
'test-error-request-id',
mockCaller.identity,
'test payload',
5000,
);
expect(handler).toHaveBeenCalledWith({
requestId: 'test-error-request-id',
callerIdentity: mockCaller.identity,
payload: 'test payload',
responseTimeout: 5000,
});

// Check if sendDataPacket was called twice (once for ACK and once for error response)
expect(mockSendDataPacket).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -141,14 +143,15 @@ describe('LocalParticipant', () => {
methodName,
'test payload',
5000,
1,
);

expect(handler).toHaveBeenCalledWith(
'test-rpc-error-request-id',
mockCaller.identity,
'test payload',
5000,
);
expect(handler).toHaveBeenCalledWith({
requestId: 'test-rpc-error-request-id',
callerIdentity: mockCaller.identity,
payload: 'test payload',
responseTimeout: 5000,
});

// Check if sendDataPacket was called twice (once for ACK and once for error response)
expect(mockSendDataPacket).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -212,11 +215,11 @@ describe('LocalParticipant', () => {
}, 10);
});

const result = await localParticipant.performRpc(
mockRemoteParticipant.identity,
const result = await localParticipant.performRpc({
destinationIdentity: mockRemoteParticipant.identity,
method,
payload,
);
});

expect(mockSendDataPacket).toHaveBeenCalledTimes(1);
expect(result).toBe(responsePayload);
Expand All @@ -226,18 +229,18 @@ describe('LocalParticipant', () => {
const method = 'timeoutMethod';
const payload = 'timeoutPayload';

const timeoutMs = 50;
const timeout = 50;

const resultPromise = localParticipant.performRpc(
mockRemoteParticipant.identity,
const resultPromise = localParticipant.performRpc({
destinationIdentity: mockRemoteParticipant.identity,
method,
payload,
timeoutMs,
);
responseTimeout: timeout,
});

mockSendDataPacket.mockImplementationOnce(() => {
return new Promise((resolve) => {
setTimeout(resolve, timeoutMs + 10);
setTimeout(resolve, timeout + 10);
});
});

Expand All @@ -246,8 +249,8 @@ describe('LocalParticipant', () => {
await expect(resultPromise).rejects.toThrow('Response timeout');

const elapsedTime = Date.now() - startTime;
expect(elapsedTime).toBeGreaterThanOrEqual(timeoutMs);
expect(elapsedTime).toBeLessThan(timeoutMs + 50); // Allow some margin for test execution
expect(elapsedTime).toBeGreaterThanOrEqual(timeout);
expect(elapsedTime).toBeLessThan(timeout + 50); // Allow some margin for test execution

expect(mockSendDataPacket).toHaveBeenCalledTimes(1);
});
Expand All @@ -271,7 +274,11 @@ describe('LocalParticipant', () => {
});

await expect(
localParticipant.performRpc(mockRemoteParticipant.identity, method, payload),
localParticipant.performRpc({
destinationIdentity: mockRemoteParticipant.identity,
method,
payload,
}),
).rejects.toThrow(errorMessage);
});

Expand All @@ -281,11 +288,11 @@ describe('LocalParticipant', () => {

mockSendDataPacket.mockImplementationOnce(() => Promise.resolve());

const resultPromise = localParticipant.performRpc(
mockRemoteParticipant.identity,
const resultPromise = localParticipant.performRpc({
destinationIdentity: mockRemoteParticipant.identity,
method,
payload,
);
});

// Simulate a small delay before disconnection
await new Promise((resolve) => setTimeout(resolve, 200));
Expand Down
Loading
Loading