-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
137 lines (119 loc) · 3.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
'use strict';
const ClientRegister = require('./lib/client-register');
const ClientMessager = require('./lib/client-messager');
const HTTP = require('http');
const Initializers = require('./initializers');
const Logger = require('./lib/logger');
const Teamster = require('teamster');
const WSServer = require('ws').Server;
const app = require('koa')();
app.use(function* showHealth() {
if (this.url === '/health') {
this.body = 'ok';
}
});
/**
* A collection of private functions, run automatically when the app starts, for
* creating our Koa app and HTTP and WS servers.
*
* @module Main
*/
Initializers.start()
.then(createTeamster)
.catch(err => { throw err; });
/**
* Start 1 or more web processes running our app.
*
* @private
*/
function createTeamster() {
Teamster.run(startServers, {
fork : process.env.NODE_ENV === 'production',
numWorkers: parseInt(process.env.NUM_WORKERS, 10) || 1,
});
}
/**
* Start HTTP and WS servers for our app.
*
* @private
*/
function startServers() {
const port = parseInt(process.env.PORT, 10) || 5000;
HTTP.createServer(app.callback()).listen(port, function() {
Logger.log({ event: `HTTP server listening on port ${port}` });
startWSServer(this);
});
}
/**
* Start a WS server for handling WebSocket connections.
*
* @private
* @param {HTTP.Server} httpServer The httpServer to attach the WS server to
*/
function startWSServer(httpServer) {
const wsServer = new WSServer({ server: httpServer });
wsServer.on('connection', onConnection);
}
/**
* Handle a newly connected WebSocket connection
*
* @private
* @param {WS.WebSocket} socket The new WebSocket connection
*/
function onConnection(socket) {
const requestID = socket.upgradeReq.headers['x-request-id'];
socket.pingInterval = setInterval(() => {
socket.send(JSON.stringify({ ping: true }));
}, 30000);
ClientRegister.registerClient(socket).then(client => {
Logger.clientLog(client, { event: 'New WebSocket client connected' });
client.socket.on('close', () => onClose(client));
client.socket.on('message', message => onMessage(client, message));
}).catch(err => {
ClientMessager.error({ socket: socket }, 'Error when joining new client');
Logger.log({ request_id: requestID, event: 'Could not create new client' });
Logger.error(err);
socket.close();
});
}
/**
* Handle a message sent by a client.
*
* @private
* @param {Client} client The client that sent the message
* @param {string} message The message sent by the client
*/
function onMessage(client, message) {
try {
message = JSON.parse(message);
} catch(err) {
Logger.error(err);
ClientMessager.error(client, 'Unparsable message sent');
return;
}
if (message.action === 'cursor') {
message.clientId = client.id;
const serializedCursor = Object.keys(message).map(k => `${k}=${message[k]}`).join('|');
ClientRegister.persistCursor(serializedCursor, client.spaceID);
ClientRegister.renewClient(client);
return;
}
if (message.action !== 'ping') {
const err = `Unrecognized action sent: ${message.action}`;
Logger.clientLog(client, { event: err });
ClientMessager.error(client, err);
return;
}
ClientRegister.renewClient(client);
}
/**
* Handle a client's socket closing.
*
* @private
* @param {Client} client The client whose socket has closed.
*/
function onClose(client) {
clearInterval(client.socket.pingInterval);
Logger.clientLog(client, { event: 'Client closed socket connection' });
ClientRegister.deregisterClient(client);
}