Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle websockets asynchronously & Fix up typing on HTTP #1008

Merged
merged 12 commits into from
Dec 30, 2024
6 changes: 3 additions & 3 deletions project/src/servers/HttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ContextVariableType } from "@spt/context/ContextVariableType";
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
import { ILogger } from "@spt/models/spt/utils/ILogger";
import type { ILogger } from "@spt/models/spt/utils/ILogger";
import { ConfigServer } from "@spt/servers/ConfigServer";
import { WebSocketServer } from "@spt/servers/WebSocketServer";
import { IHttpListener } from "@spt/servers/http/IHttpListener";
Expand All @@ -14,7 +14,7 @@ import { inject, injectAll, injectable } from "tsyringe";
@injectable()
export class HttpServer {
protected httpConfig: IHttpConfig;
protected started: boolean;
protected started = false;

constructor(
@inject("PrimaryLogger") protected logger: ILogger,
Expand Down Expand Up @@ -102,7 +102,7 @@ export class HttpServer {
* @param remoteAddress Address to check
* @returns True if its local
*/
protected isLocalRequest(remoteAddress: string): boolean {
protected isLocalRequest(remoteAddress: string | undefined): boolean | undefined {
if (!remoteAddress) {
return undefined;
}
Expand Down
28 changes: 16 additions & 12 deletions project/src/servers/WebSocketServer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import http, { IncomingMessage } from "node:http";
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
import { ILogger } from "@spt/models/spt/utils/ILogger";
import type { ILogger } from "@spt/models/spt/utils/ILogger";
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
import { LocalisationService } from "@spt/services/LocalisationService";
import { JsonUtil } from "@spt/utils/JsonUtil";
import { RandomUtil } from "@spt/utils/RandomUtil";
import { inject, injectAll, injectable } from "tsyringe";
import { Server, WebSocket } from "ws";
import { WebSocketServer as Server } from "ws";
import { SPTWebSocket } from "./ws/SPTWebsocket";

@injectable()
export class WebSocketServer {
protected webSocketServer: Server;
protected webSocketServer: Server | undefined;

constructor(
@inject("PrimaryLogger") protected logger: ILogger,
Expand All @@ -21,12 +22,12 @@ export class WebSocketServer {
@injectAll("WebSocketConnectionHandler") protected webSocketConnectionHandlers: IWebSocketConnectionHandler[],
) {}

public getWebSocketServer(): Server {
public getWebSocketServer(): Server | undefined {
return this.webSocketServer;
}

public setupWebSocket(httpServer: http.Server): void {
this.webSocketServer = new Server({ server: httpServer });
this.webSocketServer = new Server({ server: httpServer, WebSocket: SPTWebSocket });

this.webSocketServer.addListener("listening", () => {
this.logger.success(
Expand All @@ -37,7 +38,9 @@ export class WebSocketServer {
);
});

this.webSocketServer.addListener("connection", this.wsOnConnection.bind(this));
this.webSocketServer.addListener("connection", async (ws: SPTWebSocket, msg) => {
await this.wsOnConnection(ws, msg);
});
}

protected getRandomisedMessage(): string {
Expand All @@ -50,18 +53,19 @@ export class WebSocketServer {
: this.localisationService.getText("server_start_success");
}

protected wsOnConnection(ws: WebSocket, req: IncomingMessage): void {
protected async wsOnConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void> {
const socketHandlers = this.webSocketConnectionHandlers.filter((wsh) => req.url.includes(wsh.getHookUrl()));
if ((socketHandlers?.length ?? 0) === 0) {
const message = `Socket connection received for url ${req.url}, but there is not websocket handler configured for it`;
this.logger.warning(message);
ws.send(this.jsonUtil.serialize({ error: message }));
ws.close();
await ws.sendAsync(this.jsonUtil.serialize({ error: message }));
await ws.closeAsync();
return;
}
socketHandlers.forEach((wsh) => {
wsh.onConnection(ws, req);

for (const wsh of socketHandlers) {
await wsh.onConnection(ws, req);
this.logger.info(`WebSocketHandler "${wsh.getSocketId()}" connected`);
});
}
}
}
6 changes: 3 additions & 3 deletions project/src/servers/http/SptHttpListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class SptHttpListener implements IHttpListener {
sessionID: string,
req: IncomingMessage,
resp: ServerResponse,
body: Buffer,
body: Buffer | undefined,
output: string,
): Promise<void> {
const bodyInfo = this.getBodyInfo(body);
Expand Down Expand Up @@ -138,7 +138,7 @@ export class SptHttpListener implements IHttpListener {
}
}

public async getResponse(sessionID: string, req: IncomingMessage, body: Buffer): Promise<string> {
public async getResponse(sessionID: string, req: IncomingMessage, body: Buffer | undefined): Promise<string> {
const info = this.getBodyInfo(body, req.url);
if (globalThis.G_LOG_REQUESTS) {
// Parse quest info into object
Expand All @@ -158,7 +158,7 @@ export class SptHttpListener implements IHttpListener {
return output;
}

protected getBodyInfo(body: Buffer, requestUrl = undefined): any {
protected getBodyInfo(body: Buffer | undefined, requestUrl = undefined): any {
const text = body ? body.toString() : "{}";
const info = text ? this.jsonUtil.deserialize<any>(text, requestUrl) : {};
return info;
Expand Down
4 changes: 2 additions & 2 deletions project/src/servers/ws/IWebSocketConnectionHandler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { IncomingMessage } from "node:http";
import { WebSocket } from "ws";
import { SPTWebSocket } from "./SPTWebsocket";

export interface IWebSocketConnectionHandler {
getSocketId(): string;
getHookUrl(): string;
onConnection(ws: WebSocket, req: IncomingMessage): void;
onConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void>;
}
24 changes: 24 additions & 0 deletions project/src/servers/ws/SPTWebsocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import WebSocket from "ws";

export class SPTWebSocket extends WebSocket {
// biome-ignore lint/suspicious/noExplicitAny: Any is required here, I dont see any other way considering it will complain if we use BufferLike
public sendAsync(data: any): Promise<void> {
return new Promise((resolve, reject) => {
this.send(data, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}

public closeAsync(): Promise<void> {
return new Promise((resolve, reject) => {
this.on('close', () => resolve());
this.on('error', (err) => reject(err));
this.close();
});
}
}
29 changes: 17 additions & 12 deletions project/src/servers/ws/SptWebSocketConnectionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import { IWsNotificationEvent } from "@spt/models/eft/ws/IWsNotificationEvent";
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
import { NotificationEventType } from "@spt/models/enums/NotificationEventType";
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
import { ILogger } from "@spt/models/spt/utils/ILogger";
import type { ILogger } from "@spt/models/spt/utils/ILogger";
import { ConfigServer } from "@spt/servers/ConfigServer";
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
import { LocalisationService } from "@spt/services/LocalisationService";
import { JsonUtil } from "@spt/utils/JsonUtil";
import { inject, injectAll, injectable } from "tsyringe";
import { WebSocket } from "ws";
import { SPTWebSocket } from "./SPTWebsocket";

@injectable()
export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandler {
protected httpConfig: IHttpConfig;
protected webSockets: Map<string, WebSocket> = new Map<string, WebSocket>();
protected webSockets: Map<string, SPTWebSocket> = new Map<string, SPTWebSocket>();
protected defaultNotification: IWsNotificationEvent = { type: NotificationEventType.PING, eventId: "ping" };

protected websocketPingHandler: NodeJS.Timeout | undefined;
Expand All @@ -39,7 +40,7 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
return "/notifierServer/getwebsocket/";
}

public onConnection(ws: WebSocket, req: IncomingMessage): void {
public async onConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void> {
// Strip request and break it into sections
const splitUrl = req.url.substring(0, req.url.indexOf("?")).split("/");
const sessionID = splitUrl.pop();
Expand All @@ -54,18 +55,20 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
if (this.websocketPingHandler) {
clearInterval(this.websocketPingHandler);
}

ws.on("message", async (msg) => {
for (const wsmh of this.sptWebSocketMessageHandlers) {
await wsmh.onSptMessage(sessionID, this.webSockets.get(sessionID), msg);
}

ws.on("message", (msg) =>
this.sptWebSocketMessageHandlers.forEach((wsmh) =>
wsmh.onSptMessage(sessionID, this.webSockets.get(sessionID), msg),
),
);
this.logger.info(`WebSocketHandler "${wsmh.getSocketId()}" connected`);
});

this.websocketPingHandler = setInterval(() => {
this.websocketPingHandler = setInterval(async () => {
this.logger.debug(this.localisationService.getText("websocket-pinging_player", sessionID));

if (ws.readyState === WebSocket.OPEN) {
ws.send(this.jsonUtil.serialize(this.defaultNotification));
await ws.sendAsync(this.jsonUtil.serialize(this.defaultNotification));
} else {
this.logger.debug(this.localisationService.getText("websocket-socket_lost_deleting_handle"));
clearInterval(this.websocketPingHandler);
Expand All @@ -74,10 +77,12 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
}, this.httpConfig.webSocketPingDelayMs);
}

public sendMessage(sessionID: string, output: IWsNotificationEvent): void {
public async sendMessageAsync(sessionID: string, output: IWsNotificationEvent): Promise<void> {
try {
if (this.isConnectionWebSocket(sessionID)) {
this.webSockets.get(sessionID).send(this.jsonUtil.serialize(output));
const ws = this.webSockets.get(sessionID);

await ws.sendAsync(this.jsonUtil.serialize(output));
this.logger.debug(this.localisationService.getText("websocket-message_sent"));
} else {
this.logger.debug(this.localisationService.getText("websocket-not_ready_message_not_sent", sessionID));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { ILogger } from "@spt/models/spt/utils/ILogger";
import type { ILogger } from "@spt/models/spt/utils/ILogger";
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
import { inject, injectable } from "tsyringe";
import { RawData, WebSocket } from "ws";
import { RawData } from "ws";
import { SPTWebSocket } from "../SPTWebsocket";

@injectable()
export class DefaultSptWebSocketMessageHandler implements ISptWebSocketMessageHandler {
constructor(@inject("PrimaryLogger") protected logger: ILogger) {}

public onSptMessage(sessionId: string, client: WebSocket, message: RawData): void {
public async onSptMessage(sessionId: string, client: SPTWebSocket, message: RawData): Promise<void> {
this.logger.debug(`[${sessionId}] SPT message received: ${message}`);
}
}
5 changes: 3 additions & 2 deletions project/src/servers/ws/message/ISptWebSocketMessageHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { RawData, WebSocket } from "ws";
import { RawData } from "ws";
import { SPTWebSocket } from "../SPTWebsocket";

export interface ISptWebSocketMessageHandler {
onSptMessage(sessionID: string, client: WebSocket, message: RawData): void;
onSptMessage(sessionID: string, client: SPTWebSocket, message: RawData): Promise<void>;
}
Loading