Skip to content

Commit 53015d3

Browse files
committed
initial websocket
1 parent 6ede387 commit 53015d3

File tree

7 files changed

+117
-29
lines changed

7 files changed

+117
-29
lines changed

Diff for: package.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
"@types/aws-lambda": "^8.10.138",
3434
"@types/node": "18",
3535
"@types/which": "^3.0.3",
36+
"@types/ws": "^8.5.10",
3637
"aws-lambda": "^1.0.7",
3738
"axios": "^1.7.2",
3839
"execa": "^9.1.0",
3940
"typescript": "5",
40-
"which": "^4.0.0"
41+
"which": "^4.0.0",
42+
"ws": "^8.17.0"
4143
},
4244
"devDependencies": {
4345
"source-map": "^0.7.4",
@@ -48,4 +50,4 @@
4850
"webpack": "^5.91.0",
4951
"webpack-cli": "^5.1.4"
5052
}
51-
}
53+
}

Diff for: src/index.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
EndpointProxyRequest,
99
EndpointResponse,
1010
} from "./internal/types";
11+
import { WebsocketProxy } from "./internal/websocket";
1112

12-
const { _HANDLER, AWS_LAMBDA_RUNTIME_API } = process.env;
13+
const { _HANDLER, AWS_LAMBDA_RUNTIME_API, _WEBSOCKET_ROUTE } = process.env;
1314

1415
export const run = async (): Promise<void> => {
1516
if (!AWS_LAMBDA_RUNTIME_API) {
@@ -20,16 +21,18 @@ export const run = async (): Promise<void> => {
2021
throw new Error("No handler specified");
2122
}
2223

23-
log("Bootstraping", { _HANDLER, AWS_LAMBDA_RUNTIME_API });
24+
log("Bootstraping", { _HANDLER, AWS_LAMBDA_RUNTIME_API, _WEBSOCKET_ROUTE });
2425

2526
const { childProcess, bin, endpoint } = await endpointSpawn(
2627
_HANDLER,
2728
process.env
2829
);
2930

31+
const websocketProxy = new WebsocketProxy(endpoint, _WEBSOCKET_ROUTE);
32+
3033
try {
3134
log("Polling for events", { bin, endpoint });
32-
await pollForEvents(AWS_LAMBDA_RUNTIME_API, bin, endpoint);
35+
await pollForEvents(AWS_LAMBDA_RUNTIME_API, bin, endpoint, websocketProxy);
3336
} catch (e) {
3437
if (childProcess) {
3538
log("Killing child process", { pid: childProcess.pid });

Diff for: src/internal/endpoints.ts

+9-17
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,6 @@ import { info, log } from "./log";
1414
import { ChildProcess, spawn } from "child_process";
1515
import { APIGatewayProxyEventV2 } from "aws-lambda";
1616

17-
function wsify(url?: URL): URL | undefined {
18-
if (!url) return undefined;
19-
20-
const wsUrl = new URL(url.toString());
21-
wsUrl.protocol = url.protocol.replace("http", "ws");
22-
23-
if (!wsUrl.protocol.startsWith("ws")) {
24-
return undefined;
25-
}
26-
27-
return wsUrl;
28-
}
29-
3017
function convertHeaders(
3118
headers: RawAxiosResponseHeaders | AxiosResponseHeaders
3219
): { [header: string]: boolean | number | string } | undefined {
@@ -92,8 +79,7 @@ const waitForEndpoint = async (
9279

9380
export const endpointSpawn = async (
9481
handler: string,
95-
env?: NodeJS.ProcessEnv,
96-
detached: boolean = true
82+
env?: NodeJS.ProcessEnv
9783
): Promise<SpawnResult> => {
9884
// handler is in the format of
9985
// - `{some-bin}@http://localhost:{the-bins-port} (will start some-bin, and forward requests to the http server)
@@ -120,7 +106,7 @@ export const endpointSpawn = async (
120106

121107
const cmds = bin.split(" ");
122108
childProcess = spawn(cmds[0], cmds.slice(1), {
123-
detached,
109+
detached: true,
124110
stdio: "inherit",
125111
env: env,
126112
});
@@ -134,7 +120,6 @@ export const endpointSpawn = async (
134120
childProcess,
135121
bin,
136122
endpoint,
137-
wsEndpoint: wsify(endpoint),
138123
};
139124
};
140125

@@ -175,9 +160,12 @@ export const endpointProxy = async ({
175160
endpoint,
176161
event,
177162
deadline,
163+
wsProxy,
178164
}: EndpointProxyRequest): Promise<EndpointResponse> => {
179165
const rawEvent = JSON.parse(event) as Partial<APIGatewayProxyEventV2>;
180166

167+
log("!!! Received event", { requestId, rawEvent });
168+
181169
const {
182170
requestContext,
183171
rawPath,
@@ -202,6 +190,10 @@ export const endpointProxy = async ({
202190
);
203191
}
204192

193+
if (wsProxy) {
194+
wsProxy.init();
195+
}
196+
205197
if (!rawPath) {
206198
throw new Error("No path found in event");
207199
}

Diff for: src/internal/events.ts

+9-6
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ import { log } from "./log";
22
import { EndpointProxyRequest, EndpointExecRequest } from "./types";
33
import { endpointExec, endpointProxy } from "./endpoints";
44
import { getRuntimeEvent, postRuntimeEventResponse } from "./runtime";
5+
import { WebsocketProxy } from "./websocket";
56

67
export const pollForEvents = async (
78
runtimeApi: string,
89
bin?: string,
9-
endpoint?: URL
10+
endpoint?: URL,
11+
wsProxy?: WebsocketProxy
1012
): Promise<void> => {
1113
log("Waiting for next event from Lambda Runtime API", { runtimeApi });
1214

@@ -15,7 +17,7 @@ export const pollForEvents = async (
1517
let payload: any | undefined = undefined;
1618

1719
if (bin && !endpoint) {
18-
log("No endpoint specified, executing bin", { bin });
20+
log("Executing bin", { bin });
1921

2022
const request: EndpointExecRequest = {
2123
requestId,
@@ -26,20 +28,21 @@ export const pollForEvents = async (
2628

2729
payload = (await endpointExec(request)).payload;
2830

29-
log("Bin execution complete", { bin });
31+
log("Bin execution complete");
3032
} else if (endpoint) {
31-
log("Endpoint specified, proxying request", { endpoint });
33+
log("Proxying request", { endpoint });
3234

3335
const request: EndpointProxyRequest = {
3436
requestId,
3537
endpoint,
3638
event,
3739
deadline,
40+
wsProxy,
3841
};
3942

4043
payload = (await endpointProxy(request)).payload;
4144

42-
log("Proxy request complete", { endpoint });
45+
log("Proxy request complete");
4346
} else {
4447
throw new Error(
4548
`
@@ -57,5 +60,5 @@ Expected format: {bin}@{endpoint} or {bin} or {endpoint}:
5760

5861
log("Response sent to Lambda Runtime API", { runtimeApi, requestId });
5962

60-
return pollForEvents(runtimeApi, bin, endpoint);
63+
return pollForEvents(runtimeApi, bin, endpoint, wsProxy);
6164
};

Diff for: src/internal/types.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { APIGatewayProxyResult } from "aws-lambda";
22
import { ChildProcess } from "child_process";
3+
import { WebsocketProxy } from "./websocket";
34

45
export type SpawnResult = {
56
childProcess?: ChildProcess;
67
bin?: string;
78
endpoint?: URL;
8-
wsEndpoint?: URL;
99
};
1010

1111
export type RuntimeEvent = {
@@ -26,6 +26,7 @@ export type EndpointProxyRequest = {
2626
endpoint: URL;
2727
event: string;
2828
deadline: number;
29+
wsProxy?: WebsocketProxy;
2930
};
3031

3132
export type EndpointResponse = {

Diff for: src/internal/websocket.ts

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import WebSocket from "ws";
2+
import { log } from "./log";
3+
4+
function wsify(url?: string | URL): URL | undefined {
5+
if (!url) return undefined;
6+
if (typeof url === "string") url = new URL(url);
7+
8+
const wsUrl = new URL(url.toString());
9+
wsUrl.protocol = url.protocol.replace("http", "ws");
10+
11+
if (!wsUrl.protocol.startsWith("ws")) {
12+
return undefined;
13+
}
14+
15+
return wsUrl;
16+
}
17+
18+
export class WebsocketProxy {
19+
private ws?: WebSocket;
20+
public readonly url?: URL;
21+
constructor(url?: URL, private route?: string) {
22+
this.url = wsify(url);
23+
}
24+
25+
init() {
26+
const { url, route, ws } = this;
27+
28+
if (!url || !route || ws) {
29+
log("Skipping Websocket Initialization", {
30+
url,
31+
route,
32+
initialized: !!ws,
33+
});
34+
return;
35+
}
36+
37+
this.ws = new WebSocket(url);
38+
this.ws.on("open", this.handleOpen.bind(this));
39+
this.ws.on("message", this.handleMessage.bind(this));
40+
this.ws.on("error", this.handleError.bind(this));
41+
this.ws.on("ping", this.handlePing.bind(this));
42+
this.ws.on("pong", this.handlePong.bind(this));
43+
}
44+
45+
send(data: any) {
46+
log("Sending message", { data });
47+
if (!this.ws) {
48+
throw new Error("WebSocket is not initialized");
49+
}
50+
this.ws.send(data);
51+
}
52+
53+
handleOpen() {
54+
log("Connected to WebSocket", {
55+
url: this.url,
56+
route: this.route,
57+
});
58+
}
59+
60+
handlePing(data: Buffer) {
61+
log("Received Ping", { data });
62+
}
63+
64+
handlePong(data: Buffer) {
65+
log("Received Pong", { data });
66+
}
67+
68+
handleMessage(data: any) {
69+
log("Received Message", { data });
70+
}
71+
72+
handleError(error: Error) {
73+
log("WebSocket error", { error });
74+
}
75+
}

Diff for: yarn.lock

+12
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@
142142
resolved "https://registry.yarnpkg.com/@types/which/-/which-3.0.3.tgz#41142ed5a4743128f1bc0b69c46890f0453ddb89"
143143
integrity sha512-2C1+XoY0huExTbs8MQv1DuS5FS86+SEjdM9F/+GS61gg5Hqbtj8ZiDSx8MfWcyei907fIPbfPGCOrNUTnVHY1g==
144144

145+
"@types/ws@^8.5.10":
146+
version "8.5.10"
147+
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.10.tgz#4acfb517970853fa6574a3a6886791d04a396787"
148+
integrity sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==
149+
dependencies:
150+
"@types/node" "*"
151+
145152
"@webassemblyjs/[email protected]", "@webassemblyjs/ast@^1.12.1":
146153
version "1.12.1"
147154
resolved "https://registry.yarnpkg.com/@webassemblyjs/ast/-/ast-1.12.1.tgz#bb16a0e8b1914f979f45864c23819cc3e3f0d4bb"
@@ -1472,6 +1479,11 @@ wildcard@^2.0.0:
14721479
resolved "https://registry.yarnpkg.com/wildcard/-/wildcard-2.0.1.tgz#5ab10d02487198954836b6349f74fff961e10f67"
14731480
integrity sha512-CC1bOL87PIWSBhDcTrdeLo6eGT7mCFtrg0uIJtqJUFyK+eJnzl8A1niH56uu7KMa5XFrtiV+AQuHO3n7DsHnLQ==
14741481

1482+
ws@^8.17.0:
1483+
version "8.17.0"
1484+
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.0.tgz#d145d18eca2ed25aaf791a183903f7be5e295fea"
1485+
integrity sha512-uJq6108EgZMAl20KagGkzCKfMEjxmKvZHG7Tlq0Z6nOky7YF7aq4mOx6xK8TJ/i1LeK4Qus7INktacctDgY8Ow==
1486+
14751487
14761488
version "0.6.2"
14771489
resolved "https://registry.yarnpkg.com/xml2js/-/xml2js-0.6.2.tgz#dd0b630083aa09c161e25a4d0901e2b2a929b499"

0 commit comments

Comments
 (0)