Skip to content

Commit

Permalink
feat(rpc): add http transport
Browse files Browse the repository at this point in the history
This change refactors the transport layer of RPC in a way that allows to serialize RpcMessageDefinition in a custom way.
This is used then in a new `RpcHttpClientAdapter` adapter. This is a first experimental version of it.

This contains breaking changes in the way RPC adapters are written. RpcMessageWriter does not exist anymore and became an implementation detail. A new TransportClientConnection and TransportConnection abstraction has been added that is used in both: the client and server.
  • Loading branch information
marcj committed May 27, 2024
1 parent 4dcc38c commit 3b2c6cc
Show file tree
Hide file tree
Showing 31 changed files with 1,092 additions and 547 deletions.
10 changes: 6 additions & 4 deletions packages/broker/src/adapters/deepkit-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
BrokerQueueResponseHandleMessage,
BrokerQueueSubscribe,
BrokerQueueUnsubscribe,
brokerResponseGet,
brokerResponseGetCache,
brokerResponseGetCacheMeta,
brokerResponseIncrement,
Expand Down Expand Up @@ -227,11 +228,12 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
}

async get(key: string, type: Type): Promise<any> {
const first: RpcMessage = await this.pool.getConnection('key/' + key)
.sendMessage<brokerGet>(BrokerType.Get, { n: key }).firstThenClose(BrokerType.ResponseGet);
if (first.buffer && first.buffer.byteLength > first.bodyOffset) {
const first = await this.pool.getConnection('key/' + key)
.sendMessage<brokerGet>(BrokerType.Get, { n: key })
.firstThenClose<brokerResponseGet>(BrokerType.ResponseGet);
if (first.v) {
const serializer = getSerializer(type);
return serializer.decode(first.buffer, first.bodyOffset);
return serializer.decode(first.v, 0);
}
}

Expand Down
19 changes: 10 additions & 9 deletions packages/broker/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import { arrayRemoveItem, ProcessLock, ProcessLocker } from '@deepkit/core';
import {
createRpcMessage,
RpcConnectionWriter,
RpcKernel,
RpcKernelBaseConnection,
RpcKernelConnections,
RpcMessage,
RpcMessageBuilder,
RpcMessageRouteType,
TransportConnection,
} from '@deepkit/rpc';
import { Logger } from '@deepkit/logger';
import {
Expand All @@ -36,6 +36,7 @@ import {
BrokerQueuePublish,
BrokerQueueResponseHandleMessage,
BrokerQueueSubscribe,
brokerResponseGet,
brokerResponseGetCache,
brokerResponseGetCacheMeta,
brokerResponseIncrement,
Expand Down Expand Up @@ -67,11 +68,11 @@ export class BrokerConnection extends RpcKernelBaseConnection {

constructor(
logger: Logger,
transportWriter: RpcConnectionWriter,
transportConnection: TransportConnection,
protected connections: RpcKernelConnections,
protected state: BrokerState,
) {
super(logger, transportWriter, connections);
super(logger, transportConnection, connections);
}

public close(): void {
Expand Down Expand Up @@ -261,7 +262,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
);

for (const connection of this.state.invalidationCacheMessageConnections) {
connection.writer.write(message);
connection.write(message);
}
}
response.ack();
Expand All @@ -288,7 +289,7 @@ export class BrokerConnection extends RpcKernelBaseConnection {
case BrokerType.Get: {
const body = message.parseBody<brokerGet>();
const v = this.state.getKey(body.n);
response.replyBinary(BrokerType.ResponseGet, v);
response.reply<brokerResponseGet>(BrokerType.ResponseGet, { v });
break;
}
case BrokerType.EnableInvalidationCacheMessages: {
Expand Down Expand Up @@ -430,7 +431,7 @@ export class BrokerState {
);

for (const connection of subscriptions) {
connection.writer.write(message);
connection.write(message);
}
}

Expand Down Expand Up @@ -491,7 +492,7 @@ export class BrokerState {
m.tries++;
m.state = QueueMessageState.inFlight;
m.lastError = undefined;
consumer.con.writer.write(createRpcMessage<BrokerQueueResponseHandleMessage>(
consumer.con.write(createRpcMessage<BrokerQueueResponseHandleMessage>(
0, BrokerType.QueueResponseHandleMessage,
{ c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server,
));
Expand Down Expand Up @@ -568,7 +569,7 @@ export class BrokerState {
export class BrokerKernel extends RpcKernel {
protected state: BrokerState = new BrokerState;

createConnection(writer: RpcConnectionWriter): BrokerConnection {
return new BrokerConnection(this.logger, writer, this.connections, this.state);
createConnection(transport: TransportConnection): BrokerConnection {
return new BrokerConnection(this.logger, transport, this.connections, this.state);
}
}
4 changes: 4 additions & 0 deletions packages/broker/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ export interface brokerInvalidateCacheMessage {
ttl: number;
}

export interface brokerResponseGet {
v?: Uint8Array,
}

export interface brokerResponseGetCache {
v?: Uint8Array,
ttl?: number,
Expand Down
1 change: 1 addition & 0 deletions packages/example-app/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const app = new App({
publicDir: 'public',
httpLog: true,
migrateOnStartup: true,
httpRpcBasePath: 'rpc/v1'
}),
]
});
Expand Down
10 changes: 10 additions & 0 deletions packages/example-app/src/controller/rpc.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ import { Observable, Subject } from 'rxjs';
@rpc.controller('test-rpc')
export class RpcController {

@rpc.action()
hello(): string {
return 'World';
}

@rpc.action()
hi(name: string): string {
return `Hi ${name}`;
}

@rpc.action()
timesSubject(): Subject<Date> {
const subject = new Subject<Date>();
Expand Down
6 changes: 3 additions & 3 deletions packages/framework/src/application-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,15 @@ export class ApplicationServer {
return new RpcClient({
connect(connection) {
const kernelConnection = createRpcConnection(context, rpcKernel, {
write: (buffer) => connection.onData(buffer),
close: () => connection.onClose(),
writeBinary: (buffer) => connection.readBinary(buffer),
close: () => connection.onClose('closed'),
});

connection.onConnected({
close() {
kernelConnection.close();
},
send(message) {
writeBinary(message: Uint8Array) {
queueMicrotask(() => {
kernelConnection.feed(message);
});
Expand Down
6 changes: 6 additions & 0 deletions packages/framework/src/module.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ export class FrameworkConfig {

debug: boolean = false;

/**
* @description If set, allows to call RPC methods via HTTP. The value is the base URL for the RPC calls.
* Use e.g. `/rpc/v1`
*/
httpRpcBasePath: string = '';

debugUrl: string = '_debug';

/**
Expand Down
51 changes: 46 additions & 5 deletions packages/framework/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ import { DebugDIController } from './cli/debug-di.js';
import { ServerStartController } from './cli/server-start.js';
import { DebugController } from './debug/debug.controller.js';
import { registerDebugHttpController } from './debug/http-debug.controller.js';
import { http, HttpLogger, HttpModule, HttpRequest, serveStaticListener } from '@deepkit/http';
import {
http,
HttpLogger,
HttpModule,
HttpRegExp,
HttpRequest,
HttpResponse,
serveStaticListener,
} from '@deepkit/http';
import { InjectorContext, ProviderWithScope, Token } from '@deepkit/injector';
import { BrokerConfig, FrameworkConfig } from './module.config.js';
import { Logger } from '@deepkit/logger';
Expand All @@ -37,7 +45,6 @@ import {
import { FileStopwatchStore } from './debug/stopwatch/store.js';
import { DebugProfileFramesCommand } from './cli/debug-debug-frames.js';
import {
ConnectionWriter,
rpcClass,
RpcKernel,
RpcKernelBaseConnection,
Expand Down Expand Up @@ -113,7 +120,6 @@ export class FrameworkModule extends createModule({
{ provide: SessionState, scope: 'rpc', useValue: undefined },
{ provide: RpcKernelBaseConnection, scope: 'rpc', useValue: undefined },
{ provide: RpcKernelConnection, scope: 'rpc', useValue: undefined },
{ provide: ConnectionWriter, scope: 'rpc', useValue: undefined },
],
workflows: [
// rpcWorkflow,
Expand Down Expand Up @@ -152,7 +158,6 @@ export class FrameworkModule extends createModule({
SessionState,
RpcKernelConnection,
RpcKernelBaseConnection,
ConnectionWriter,

BrokerDeepkitAdapter,
BrokerCache,
Expand Down Expand Up @@ -223,10 +228,46 @@ export class FrameworkModule extends createModule({
// this.setupProvider(LiveDatabase).enableChangeFeed(DebugRequest);
}

if (this.config.httpRpcBasePath) {
const rpcBaseUrl = this.config.httpRpcBasePath;
@http.controller(rpcBaseUrl)
class HttpRpcController {
constructor(protected rpcKernel: RpcKernel) {
}

@http.ANY(':controller/:method')
async handle(
controller: HttpRegExp<string, '.*'>,
method: string,
request: HttpRequest,
response: HttpResponse,
) {
const connection = this.rpcKernel.createConnection({
write: (data) => {

},
bufferedAmount() {
return 0;
},
close() {

},
clientAddress() {
return request.socket.remoteAddress || '';
},
});
request.body = await request.readBody();
await connection.onRequest(rpcBaseUrl, request, response);
return response;
}
}
this.addController(HttpRpcController);
}

const disconnect = async (event: unknown, broker: DebugBrokerBus, store: StopwatchStore) => {
await store.close();
await broker.adapter.disconnect();
}
};
this.addListener(onAppShutdown.listen(disconnect));
// Registering at onServerShutdown also so that ApplicationServer.close disconnects all connections.
this.addListener(onServerShutdown.listen(disconnect));
Expand Down
6 changes: 3 additions & 3 deletions packages/framework/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import { InjectorContext } from '@deepkit/injector';
import {
rpcActionType,
RpcConnectionWriter,
RpcControllerAccess,
RpcKernel,
RpcKernelBaseConnection,
RpcKernelConnection,
RpcMessage,
RpcMessageBuilder,
RpcServerAction,
TransportConnection,
} from '@deepkit/rpc';
import { FrameCategory, Stopwatch } from '@deepkit/stopwatch';
import { ClassType } from '@deepkit/core';
Expand Down Expand Up @@ -89,8 +89,8 @@ export class RpcKernelWithStopwatch extends RpcKernel {

stopwatch?: Stopwatch;

createConnection(writer: RpcConnectionWriter, injector?: InjectorContext): RpcKernelBaseConnection {
const connection = super.createConnection(writer, injector);
createConnection(transport: TransportConnection, injector?: InjectorContext): RpcKernelBaseConnection {
const connection = super.createConnection(transport, injector);
if (this.stopwatch && connection instanceof RpcKernelConnectionWithStopwatch) {
connection.setStopwatch(this.stopwatch);
}
Expand Down
22 changes: 10 additions & 12 deletions packages/framework/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
*/

import {
ConnectionWriter,
RpcConnectionWriter,
RpcKernel,
RpcKernelBaseConnection,
RpcKernelConnection,
SessionState,
TransportConnection,
} from '@deepkit/rpc';
import http, { Server } from 'http';
import https from 'https';
Expand Down Expand Up @@ -105,7 +104,7 @@ export interface RpcServerListener {
}

export interface RpcServerCreateConnection {
(writer: RpcConnectionWriter, request?: HttpRequest): RpcKernelBaseConnection;
(transport: TransportConnection, request?: HttpRequest): RpcKernelBaseConnection;
}

export interface RpcServerOptions {
Expand Down Expand Up @@ -177,13 +176,12 @@ export class WebMemoryWorkerFactory extends WebWorkerFactory {
}
}

export function createRpcConnection(rootScopedContext: InjectorContext, rpcKernel: RpcKernel, writer: RpcConnectionWriter, request?: HttpRequest) {
export function createRpcConnection(rootScopedContext: InjectorContext, rpcKernel: RpcKernel, transport: TransportConnection, request?: HttpRequest) {
const injector = rootScopedContext.createChildScope('rpc');
injector.set(HttpRequest, request);
injector.set(RpcInjectorContext, injector);
injector.set(ConnectionWriter, writer);

const connection = rpcKernel.createConnection(writer, injector);
const connection = rpcKernel.createConnection(transport, injector);
injector.set(SessionState, connection.sessionState);
injector.set(RpcKernelConnection, connection);
injector.set(RpcKernelBaseConnection, connection);
Expand Down Expand Up @@ -298,21 +296,21 @@ export class WebWorker {

private startRpc() {
if (this.server) {
this.rpcListener = this.rpcServer.start({ server: this.server }, (writer: RpcConnectionWriter, request?: HttpRequest) => {
this.rpcListener = this.rpcServer.start({ server: this.server }, (transport, request?: HttpRequest) => {
if (this.shuttingDown) {
writer.close();
transport.close();
throw new Error('Server is shutting down');
}
return createRpcConnection(this.injectorContext, this.rpcKernel, writer, request);
return createRpcConnection(this.injectorContext, this.rpcKernel, transport, request);
});
}
if (this.servers) {
this.rpcListener = this.rpcServer.start({ server: this.servers }, (writer: RpcConnectionWriter, request?: HttpRequest) => {
this.rpcListener = this.rpcServer.start({ server: this.servers }, (transport, request?: HttpRequest) => {
if (this.shuttingDown) {
writer.close();
transport.close();
throw new Error('Server is shutting down');
}
return createRpcConnection(this.injectorContext, this.rpcKernel, writer, request);
return createRpcConnection(this.injectorContext, this.rpcKernel, transport, request);
});
}
}
Expand Down
10 changes: 5 additions & 5 deletions packages/rpc-tcp/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { parseHost } from '@deepkit/core';
import { ClientTransportAdapter, TransportConnectionHooks } from '@deepkit/rpc';
import { ClientTransportAdapter, TransportClientConnection } from '@deepkit/rpc';
import { connect } from 'net';

/*
Expand All @@ -14,19 +14,19 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter {
this.host = parseHost(host);
}

public async connect(connection: TransportConnectionHooks) {
public async connect(connection: TransportClientConnection) {
const port = this.host.port || 8811;
const socket = this.host.isUnixSocket ? connect({ path: this.host.unixSocket }) : connect({
port: port,
host: this.host.host
});

socket.on('data', (data: Uint8Array) => {
connection.onData(data);
connection.readBinary(data);
});

socket.on('close', () => {
connection.onClose();
connection.onClose('socket closed');
});

socket.on('error', (error: any) => {
Expand All @@ -46,7 +46,7 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter {
close() {
socket.end();
},
send(message) {
writeBinary(message) {
socket.write(message);
}
});
Expand Down
Loading

0 comments on commit 3b2c6cc

Please sign in to comment.