Skip to content

Commit

Permalink
feat(rpc): client.transporter.errored subject
Browse files Browse the repository at this point in the history
With this change it is possible to subscribe to onError events from the transporter and get the actual error object.
  • Loading branch information
marcj committed May 16, 2024
1 parent c6ca4ec commit 0fc2bd4
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 25 deletions.
1 change: 1 addition & 0 deletions packages/rpc-tcp/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter {
});

socket.on('error', (error: any) => {
error = error instanceof Error ? error : new Error(String(error));
connection.onError(error);
});

Expand Down
79 changes: 56 additions & 23 deletions packages/rpc/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export interface TransportConnectionHooks {

onData(buffer: Uint8Array, bytes?: number): void;

onError(error: any): void;
onError(error: Error): void;
}

export interface ClientTransportAdapter {
Expand All @@ -88,7 +88,7 @@ export interface WritableClient {
connectionId?: number,
peerId?: string,
timeout?: number
}
},
): RpcMessageSubject;
}

Expand Down Expand Up @@ -122,19 +122,34 @@ export class RpcClientTransporter {
public id?: Uint8Array;

/**
* true when the connection fully established (after authentication)
* When the connection is established (including handshake and authentication).
*/
public readonly connection = new BehaviorSubject<boolean>(false);

/**
* When the connection was reconnected. This is not called for the very first connection.
*/
public readonly reconnected = new Subject<number>();

/**
* When the connection was disconnected (due to error or close).
* This increases the connectionId by one.
*/
public readonly disconnected = new Subject<number>();

/**
* Triggered for any onError call from the transporter.
* Right after this event, onDisconnect is called (and thus connection.next(false) and disconnected.next()).
*/
public readonly errored = new Subject<{ connectionId: number, error: Error }>();

public reader = new RpcMessageReader(
(v) => this.onMessage(v),
(id) => {
if (this.writer) {
this.writer.write(createRpcMessage(id, RpcTypes.ChunkAck));
}
}
},
);

public constructor(
Expand All @@ -159,7 +174,14 @@ export class RpcClientTransporter {
return this.connected;
}

protected onError() {
protected onError(error: Error) {
if (this.connected) {
// We do not want to call errored if we are not yet connected,
// since errors thrown while in connection process are forwarded
// to the connection promise (and thus are thrown in connect()
// or in any rpc action).
this.errored.next({ connectionId: this.connectionId, error });
}
this.onDisconnect();
}

Expand Down Expand Up @@ -256,8 +278,8 @@ export class RpcClientTransporter {
resolve(undefined);
},

onError: (error: Event) => {
this.onError();
onError: (error: Error) => {
this.onError(error);
reject(new OfflineError(`Could not connect: ${formatError(error)}`));
},

Expand Down Expand Up @@ -315,7 +337,10 @@ export class RpcClientPeer {

}

public controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: { timeout?: number, dontWaitForConnection?: true } = {}): RemoteController<T> {
public controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: {
timeout?: number,
dontWaitForConnection?: true
} = {}): RemoteController<T> {
const controller = new RpcControllerState('string' === typeof nameOrDefinition ? nameOrDefinition : nameOrDefinition.path);
controller.peerId = this.peerId;

Expand All @@ -324,7 +349,7 @@ export class RpcClientPeer {
return (...args: any[]) => {
return this.actionClient.action(controller, propertyName as string, args, options);
};
}
},
}) as any as RemoteController<T>;
}

Expand All @@ -335,8 +360,12 @@ export class RpcClientPeer {


export type RpcEventMessage = { id: number, date: Date, type: number, body: any };
export type RpcClientEventIncomingMessage = { event: 'incoming', composite: boolean, messages: RpcEventMessage[] } & RpcEventMessage;
export type RpcClientEventOutgoingMessage = { event: 'outgoing', composite: boolean, messages: RpcEventMessage[] } & RpcEventMessage;
export type RpcClientEventIncomingMessage =
{ event: 'incoming', composite: boolean, messages: RpcEventMessage[] }
& RpcEventMessage;
export type RpcClientEventOutgoingMessage =
{ event: 'outgoing', composite: boolean, messages: RpcEventMessage[] }
& RpcEventMessage;

export type RpcClientEvent = RpcClientEventIncomingMessage | RpcClientEventOutgoingMessage;

Expand All @@ -355,7 +384,7 @@ export class RpcBaseClient implements WritableClient {
public events = new Subject<RpcClientEvent>();

constructor(
protected transport: ClientTransportAdapter
protected transport: ClientTransportAdapter,
) {
this.transporter = new RpcClientTransporter(this.transport);
this.transporter.onMessage = this.onMessage.bind(this);
Expand Down Expand Up @@ -443,7 +472,7 @@ export class RpcBaseClient implements WritableClient {
connectionId?: number,
peerId?: string,
timeout?: number
} = {}
} = {},
): RpcMessageSubject {
const resolvedSchema = schema ? resolveReceiveType(schema) : undefined;
if (body && !schema) throw new Error('Body given, but not type');
Expand All @@ -452,15 +481,15 @@ export class RpcBaseClient implements WritableClient {
const dontWaitForConnection = !!options.dontWaitForConnection;
// const timeout = options && options.timeout ? options.timeout : 0;

const continuation = <T>(type: number, body?: T, schema?: ReceiveType<T>,) => {
const continuation = <T>(type: number, body?: T, schema?: ReceiveType<T>) => {
if (connectionId === this.transporter.connectionId) {
//send a message with the same id. Don't use sendMessage() again as this would lead to a memory leak
// and a new id generated. We want to use the same id.
if (this.events.observers.length) {
this.events.next({
event: 'outgoing',
date: new Date,
id, type, body, messages: [], composite: false
id, type, body, messages: [], composite: false,
});
}
const message = createRpcMessage(id, type, body, undefined, schema);
Expand Down Expand Up @@ -488,7 +517,7 @@ export class RpcBaseClient implements WritableClient {
this.events.next({
event: 'outgoing',
date: new Date,
id, type, body, messages: [], composite: false
id, type, body, messages: [], composite: false,
});
}

Expand All @@ -505,14 +534,14 @@ export class RpcBaseClient implements WritableClient {
this.events.next({
event: 'outgoing',
date: new Date,
id, type, body, messages: [], composite: false
id, type, body, messages: [], composite: false,
});
}
this.transporter.send(message, progress?.upload);
},
(e) => {
subject.next(new ErroredRpcMessage(id, e));
}
},
);
}

Expand Down Expand Up @@ -586,7 +615,7 @@ export class RpcClient extends RpcBaseClient {
write: (answer: Uint8Array) => {
//should we modify the package?
this.transporter.send(answer);
}
},
};

//todo: set up timeout for idle detection. Make the timeout configurable
Expand Down Expand Up @@ -614,7 +643,7 @@ export class RpcClient extends RpcBaseClient {
},
bufferedAmount: () => {
return this.transporter.bufferedAmount();
}
},
});
// Important to disable since transporter.send chunks already,
// otherwise data is chunked twice and protocol breaks.
Expand Down Expand Up @@ -667,7 +696,7 @@ export class RpcClient extends RpcBaseClient {
deregister: async () => {
await this.sendMessage<rpcPeerDeregister>(RpcTypes.PeerDeregister, { id }).firstThenClose(RpcTypes.Ack);
this.registeredAsPeer = undefined;
}
},
};
}

Expand All @@ -689,7 +718,11 @@ export class RpcClient extends RpcBaseClient {
return peer;
}

public controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: { timeout?: number, dontWaitForConnection?: true, typeReuseDisabled?: boolean } = {}): RemoteController<T> {
public controller<T>(nameOrDefinition: string | ControllerDefinition<T>, options: {
timeout?: number,
dontWaitForConnection?: true,
typeReuseDisabled?: boolean
} = {}): RemoteController<T> {
const controller = new RpcControllerState('string' === typeof nameOrDefinition ? nameOrDefinition : nameOrDefinition.path);

options = options || {};
Expand All @@ -702,7 +735,7 @@ export class RpcClient extends RpcBaseClient {
return (...args: any[]) => {
return this.actionClient.action(controller, propertyName as string, args, options);
};
}
},
}) as any as RemoteController<T>;
}

Expand Down
50 changes: 50 additions & 0 deletions packages/rpc/tests/connection.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { expect, test } from '@jest/globals';
import { RpcKernel } from '../src/server/kernel.js';
import { RpcClient, TransportConnectionHooks } from '../src/client/client.js';

test('connect', async () => {
const kernel = new RpcKernel();

const connections: TransportConnectionHooks[] = [];

const client = new RpcClient({
connect(connection: TransportConnectionHooks) {
const kernelConnection = kernel.createConnection({
write: (buffer) => connection.onData(buffer),
close: () => {
connection.onClose();
},
});

connections.push(connection);

connection.onConnected({
clientAddress: () => {
return 'direct';
},
bufferedAmount(): number {
return 0;
},
close() {
kernelConnection.close();
},
send(buffer) {
kernelConnection.feed(buffer);
},
});
},
});

const errors: Error[] = [];
client.transporter.errored.subscribe((error) => {
errors.push(error.error);
});

await client.connect();
expect(client.transporter.isConnected()).toBe(true);
expect(connections).toHaveLength(1);

connections[0].onError(new Error('test'));
expect(errors[0].message).toEqual('test');
expect(client.transporter.isConnected()).toBe(false);
});
4 changes: 2 additions & 2 deletions packages/rpc/tests/custom-message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ test('back controller', async () => {

const client = new DirectClient(kernel);

// This wait for the server's Ack
// This waits for the server's Ack
const answer = await client
.sendMessage(MyTypes.QueryAndAnswer)
.firstThenClose<{ v: string }>(MyTypes.Answer);
expect(answer.v).toBe('42 is the answer');

// This wait for the server's Ack
// This waits for the server's Ack
await client
.sendMessage<{v: string}>(MyTypes.BroadcastWithAck, {v: 'Hi1'})
.ackThenClose();
Expand Down

0 comments on commit 0fc2bd4

Please sign in to comment.