-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathremote.js
409 lines (377 loc) · 15.3 KB
/
remote.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
var util = require ('util');
var fs = require ('fs');
var path = require ('path');
var EventEmitter = require ('events').EventEmitter;
var https = require ('https');
var http = require ('http');
var os = require ('os');
var bunyan = require ('bunyan');
var async = require ('async');
var filth = require ('filth');
var RemoteTransport = require ('./lib/RemoteTransport');
var Router = require ('./lib/Router');
var DEFAULT_CONFIG = require ('submergence').DEFAULT_CONFIG;
var standalone =
'<script type="text/javascript">'
+ fs.readFileSync (path.resolve (__dirname, './build/bundle.js')).toString()
+ '</script>'
;
/** @module/class substation:Remote
@super submergence
Instantiate a `Remote` to connect to a [submergence]() service layer hosted on another cluster.
If you use the normal [substation]() module as a monad and your config includes the [APIKey]
(:Configuration#APIKey) and [APIForward]((:Configuration#APIForward) keys you will get a
`Remote` automatically.
@argument/substation:Configuration config
@returns/substation:Remote
If the `new` keyword is not used, an instance is created and returned.
*/
function Remote (config) {
if (!(this instanceof Remote))
return new Remote (config);
EventEmitter.call (this);
this.config = filth.clone (DEFAULT_CONFIG);
filth.merge (this.config, config);
this.logger = new bunyan ({
name: this.config.applicationName,
stream: process.stdout,
level: this.config.loggingLevel
});
this.server = new RemoteTransport (config, this);
this.router = new Router (this, this.config);
if (
!this.config.APIKey
|| !this.config.APIHost
)
throw new Error ('APIKey and APIHost are required');
}
util.inherits (Remote, EventEmitter);
module.exports = Remote;
Remote.prototype.addAction = function(){
return this.router.addAction.apply (this.router, arguments);
};
/** @member/Function listen
Reads the remote configuration and compares it to the local instance. If the remote
configuration does not match, the `Remote` will do one of the following:
* By default, the process will exit.
* If the [APIOverwriteActions](:Configuration#APIOverwriteActions) option is set, the local
configuration is written to the server. If this fails for some reason, the process will
exit.
* If the [APIAcceptConfig](:Configuration#APIAcceptConfig) option is set, the discrepency is
logged but startup continues.
Once the remote configuration is resolved, [actions](substation:Action) have their [setup]
(substation:Action:Configuration#setup) Functions run. Finally, the port is opened and this
`Remote` begins accepting requests.
@argument/Number port
The port number to listen on.
@callback
The server is now accepting requests. If something prevents this, the process will exit with
status code `1`.
*/
var SERVER_EVENTS = [
'userOnline', 'userOffline', 'clientOnline', 'clientOffline', 'peerRequest', 'liveConnection'
];
Remote.prototype.listen = function (port, callback) {
var self = this;
// setup the API forwarding information for this node
// unless already configured
if (!this.config.APIForward)
this.config.APIForward = {
host: os.hostname(),
port: port
};
this.router.init (function(){
function cleanup (err) {
if (err)
return self.logger.fatal (err);
self.server.listen (port, self.router, callback);
}
var localConfig = { events:{} };
for (var i=0,j=SERVER_EVENTS.length; i<j; i++) {
var event = SERVER_EVENTS[i];
if (self.listeners (event).length)
localConfig.events[event] = filth.clone (self.config.APIForward);
}
// we must check the remote configuration and optionally overwrite it
var pathstr =
'/config?apiKey='
+ encodeURIComponent (self.config.APIKey)
+ '&domain='
+ encodeURIComponent (self.config.domain)
;
var configRequest = http.request ({
host: self.config.APIHost,
path: pathstr,
method: 'GET',
headers: {
Accept: 'application/json'
}
}, function (response) {
var chunks = [];
response.on ('data', function (chunk) { chunks.push (chunk); });
response.on ('error', function (err) {
self.logger.fatal (err, 'could not pull a configuration from the remote service');
});
response.on ('end', function(){
try {
var currentConfig = JSON.parse (Buffer.concat (chunks).toString());
} catch (err) {
self.logger.fatal ('remote service response was invalid JSON');
return;
}
currentConfig = currentConfig.content;
var configID = currentConfig._id;
delete currentConfig._id;
self.router.getAllActions (function (err, actions) {
if (err)
return self.logger.fatal (err);
localConfig.actions = actions.map (function (item) {
var actionDoc = item.export();
actionDoc.forward = filth.clone (self.config.APIForward);
return actionDoc;
});
if (filth.compare (localConfig, currentConfig)) {
self.logger.info ('remote configuration matches');
return cleanup();
}
if (!self.config.APIOverwriteActions) {
self.logger.fatal (
'remote service configuration does not match local server'
);
return process.exit (1);
}
var pathstr =
'/config/'
+ encodeURIComponent (configID)
+ '?apiKey='
+ encodeURIComponent (self.config.APIKey)
;
var localConfigStr = JSON.stringify (localConfig);
var configWriteRequest = http.request ({
host: self.config.APIHost,
path: pathstr,
method: 'PUT',
headers: {
Accept: 'application/json',
'Content-Length': Buffer.byteLength (localConfigStr)
}
}, function (response) {
if (response.statusCode == '200') {
// config written successfully
response.removeAllListeners();
response.emit ('end');
cleanup();
return;
}
var chunks = [];
response.on ('data', function (chunk) { chunks.push (chunk); });
response.on ('error', function (err) {
self.logger.fatal (err, 'failed to write config to remote service');
return process.exit (1);
});
response.on ('end', function(){
try {
var responseBody = JSON.parse (Buffer.concat (chunks).toString());
} catch (err) {
self.logger.fatal ('remote service responded with invalid json');
return process.exit (1);
}
if (response.statusCode == '400')
self.logger.fatal (
responseBody,
'remote service rejected configuration as invalid'
);
else if (response.statusCode == '403')
self.logger.fatal (
responseBody,
'remote service rejected the APIKey'
);
else
self.logger.fatal (
responseBody,
'unknown remote service error'
);
return process.exit (1);
});
});
configWriteRequest.on ('error', function (err) {
self.logger.fatal (err, 'failed to update remote configuration');
return process.exit (1);
});
configWriteRequest.write (localConfigStr);
configWriteRequest.end();
});
});
});
configRequest.on ('error', function (err) {
self.logger.fatal (err, 'failed to read remote configuration');
return process.exit (1);
});
configRequest.end();
});
};
/** @member/Function sendEvent
Send an event to a User or User/Client pair with at least one active `Socket.io` connection.
These are best-effort events - delivery will not be ensured, just attempted. Messages with
ensured delivery must be handled in the application. Guaranteed delivery like `substation` could
provide is of limited value because it only guarantees that the message arrived, not that it was
successfully acted upon.
Makes an http `POST` request against the remote service on the path `/event`.
@argument/String user
Send events to connections with this User ID.
@argument/String client
@optional
If present, narrows the User ID selection to this specific User/Client pair.
@argument/Array info
The argument parameters as they will appear on the client, beginning with the String name of the
event to emit.
@callback
@optional
@argument/Error|undefined err
If a technical Error prevented the attempt for proceeding, it is passed here.
@argument/Boolean didReceive
Whether a client is expected to receive the event. This result value is produced early,
after connections have been found but before any data has been sent. It is possible, though
unlikely, that one of the selected connections will go offline in the next fistfull of
milliseconds, resulting in a `true` for `didReceive` when no events were in fact delivered.
*/
Remote.prototype.sendEvent = function (/* user, client, info, callback */) {
var user, client, info, callback;
switch (arguments.length) {
case 2:
user = arguments[0];
info = arguments[1];
break;
case 3:
user = arguments[0];
info = arguments[1];
callback = arguments[2];
break;
default:
user = arguments[0];
client = arguments[1]
info = arguments[2];
callback = arguments[3];
}
if (typeof info[0] != 'string')
return process.nextTick (function(){ callback (
new Error ('First event argument must be a string')
); });
var pathstr =
'/event?apiKey='
+ encodeURIComponent (this.config.APIKey)
+ '&domain='
+ encodeURIComponent (this.config.domain)
+ '&user='
+ encodeURIComponent (user)
;
if (client)
pathstr += '&client=' + encodeURIComponent (client);
var infoStr = JSON.stringify (info);
var eventRequest = http.request ({
host: this.config.APIHost,
path: pathstr,
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Length': Buffer.byteLength (infoStr)
}
}, function (response) {
var chunks = [];
response.on ('data', function (chunk) { chunks.push (chunk); });
response.on ('error', function (err) {
if (callback)
callback (err);
});
response.on ('end', function(){
try {
var body = JSON.parse (Buffer.concat (chunks).toString());
} catch (err) {
if (callback)
return callback (new Error ('remote service response was invalid JSON'));
return;
}
if (response.statusCode == '200') {
if (callback)
callback (undefined, Boolean (body));
} else
if (callback)
callback (body);
});
});
eventRequest.on ('error', function (err) {
if (callback)
callback (err);
});
eventRequest.write (infoStr);
eventRequest.end();
};
/** @member/Function isActive
Check for an active `Socket.io` connection belonging to a User or User/Client pair. Makes an
http `GET` request against the remote service on the path `/session`.
@argument/String user
The user to look for.
@argument/String client
@optional
Only include connections belonging to this specific client.
@callback
@argument/Error|undefined
If a technical Error prevented the attempt for proceeding, it is passed here.
@argument/Boolean isActive
Whether one or more connections exist for the named User or User/Client pair.
*/
Remote.prototype.isActive = function (/* user, client, callback */) {
var user, client, callback;
switch (arguments.length) {
case 2:
user = arguments[0];
callback = arguments[1];
break;
default:
user = arguments[0];
client = arguments[1]
callback = arguments[2];
}
var pathstr =
'/session?apiKey='
+ encodeURIComponent (this.config.APIKey)
+ '&domain='
+ encodeURIComponent (this.config.domain)
+ '&user='
+ encodeURIComponent (user)
;
if (client)
pathstr += '&client=' + encodeURIComponent (client);
var eventRequest = http.request ({
host: this.config.APIHost,
path: pathstr
}, function (response) {
if (response.statusCode == '200') {
callback (undefined, true);
response.emit ('end');
return;
}
if (response.statusCode == '204') {
callback (undefined, false);
response.emit ('end');
return;
}
var chunks = [];
response.on ('data', function (chunk) { chunks.push (chunk); });
response.on ('error', callback);
response.on ('end', function(){
try {
var body = JSON.parse (Buffer.concat (chunks).toString());
} catch (err) {
callback (new Error ('remote service response was invalid JSON'));
return;
}
return callback (undefined, Boolean (body.didRecieve));
});
});
eventRequest.on ('error', function (err) {
if (callback)
callback (err);
});
eventRequest.end();
};