forked from sagemath/cloud
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlocal_hub.coffee
executable file
·1791 lines (1567 loc) · 73.1 KB
/
local_hub.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#################################################################
#
# local_hub -- a node.js program that runs as a regular user, and
# coordinates and maintains the connections between
# the global hubs and *all* projects running as
# this particular user.
#
# The local_hub is a bit like the "screen" program for Unix, except
# that it simultaneously manages numerous sessions, since simultaneously
# doing a lot of IO-based things is what Node.JS is good at.
#
#
# NOTE: For local debugging, run this way, since it gives better stack
# traces.CodeMirrorSession: _connect to file
#
# make_coffee && echo "require('local_hub').start_server()" | coffee
#
# (c) William Stein, 2013
#
#################################################################
async = require 'async'
fs = require 'fs'
net = require 'net'
child_process = require 'child_process'
uuid = require 'node-uuid'
message = require 'message'
misc = require 'misc'
misc_node = require 'misc_node'
winston = require 'winston'
temp = require 'temp'
diffsync = require 'diffsync'
{to_json, from_json, defaults, required} = require 'misc'
json = (out) -> misc.trunc(misc.to_json(out),512)
{ensure_containing_directory_exists, abspath} = misc_node
#####################################################################
# Generate the "secret_token" file as
# $SAGEMATHCLOUD/data/secret_token if it does not already
# exist. All connections to all local-to-the user services that
# SageMathClouds starts must be prefixed with this key.
#####################################################################
# WARNING -- the sage_server.py program can't get these definitions from
# here, since it is not written in node; if this path changes, it has
# to be change there as well (it will use the SAGEMATHCLOUD environ
# variable though).
DATA = process.env['SAGEMATHCLOUD'] + '/data'
CONFPATH = exports.CONFPATH = abspath(DATA)
secret_token_filename = exports.secret_token_filename = "#{CONFPATH}/secret_token"
secret_token = undefined
# We use an n-character cryptographic random token, where n is given
# below. If you want to change this, changing only the following line
# should be safe.
secret_token_length = 128
init_confpath = () ->
async.series([
# Read or create the file; after this step the variable secret_token
# is set and the file exists.
(cb) ->
fs.exists secret_token_filename, (exists) ->
if exists
winston.debug("read '#{secret_token_filename}'")
fs.readFile secret_token_filename, (err, buf) ->
secret_token = buf.toString()
cb()
else
winston.debug("create '#{secret_token_filename}'")
require('crypto').randomBytes secret_token_length, (ex, buf) ->
secret_token = buf.toString('base64')
fs.writeFile(secret_token_filename, secret_token, cb)
# Ensure restrictive permissions on the secret token file.
(cb) ->
fs.chmod(secret_token_filename, 0o600, cb)
])
###############################################
# Console sessions
###############################################
ports = {}
get_port = (type, cb) -> # cb(err, port number)
if ports[type]?
cb(false, ports[type])
else
fs.readFile abspath("#{DATA}/#{type}_server.port"), (err, content) ->
if err
cb(err)
else
try
ports[type] = parseInt(content)
cb(false, ports[type])
catch e
cb("console_server port file corrupted")
forget_port = (type) ->
if ports[type]?
delete ports[type]
class ConsoleSessions
constructor: () ->
@_sessions = {}
session_exists: (session_uuid) =>
return @_sessions[session_uuid]?
terminate_session: (session_uuid, cb) =>
session = @_sessions[session_uuid]
if not session?
cb()
else
winston.debug("terminate console session '#{session_uuid}'")
if session.status == 'running'
session.socket.end()
cb()
else
cb()
# Connect to (if 'running'), restart (if 'dead'), or create (if
# non-existing) the console session with mesg.session_uuid.
connect: (client_socket, mesg) =>
winston.debug("connect to console session #{mesg.session_uuid}")
session = @_sessions[mesg.session_uuid]
if session? and session.status == 'running'
winston.debug("console session exists and is running")
client_socket.write_mesg('json', {desc:session.desc, history:session.history.toString()})
plug(client_socket, session.socket)
session.clients.push(client_socket)
else
winston.debug("console session does not exist or is not running, so we make a new session")
get_port 'console', (err, port) =>
winston.debug("got console server port = #{port}")
if err
winston.debug("can't determine console server port; probably console server not running")
client_socket.write_mesg('json', message.error(id:mesg.id, error:"problem determining port of console server."))
else
@_new_session(client_socket, mesg, port, session?.history)
_new_session: (client_socket, mesg, port, history) =>
winston.debug("_new_session: defined by #{json(mesg)}")
# Connect to port CONSOLE_PORT, send mesg, then hook sockets together.
misc_node.connect_to_locked_socket
port : port
token : secret_token
cb : (err, console_socket) =>
if err
forget_port('console')
client_socket.write_mesg('json', message.error(id:mesg.id, error:"local_hub -- Problem connecting to console server."))
winston.debug("_new_session: console server denied connection")
return
# Request a Console session from console_server
misc_node.enable_mesg(console_socket)
console_socket.write_mesg('json', mesg)
# Read one JSON message back, which describes the session
console_socket.once 'mesg', (type, desc) =>
if not history?
history = new Buffer(0)
client_socket.write_mesg('json', {desc:desc, history:history.toString()}) # in future, history could be read from a file
# Disable JSON mesg protocol, since it isn't used further
misc_node.disable_mesg(console_socket)
misc_node.disable_mesg(client_socket)
session =
socket : console_socket
desc : desc,
status : 'running',
clients : [client_socket],
history : history
session_uuid : mesg.session_uuid
project_id : mesg.project_id
# Connect the sockets together.
# receive data from the user (typing at their keyboard)
client_socket.on 'data', (data) ->
activity()
console_socket.write(data)
session.amount_of_data = 0
session.last_data = misc.mswalltime()
# receive data from the pty, which we push out to the user (via global hub)
console_socket.on 'data', (data) ->
# every 2 ms we reset the burst data watcher.
tm = misc.mswalltime()
if tm - session.last_data >= 2
session.amount_of_data = 0
session.last_data = tm
if session.amount_of_data > 200000
# We just got more than 200000 characters of output in <= 2 ms, so truncate it.
# I had a control-c here, but it was EVIL (and useless), so do *not* enable this.
# console_socket.write(String.fromCharCode(3))
# client_socket.write('[...]')
data = '[...]'
session.history += data
session.amount_of_data += data.length
n = session.history.length
if n > 400000 # TODO: totally arbitrary; also have to change the same thing in hub.coffee
session.history = session.history.slice(session.history.length - 300000)
# Never push more than 20000 characters at once to client hub, since that could overwhelm...
if data.length > 20000
data = "[...]"+data.slice(data.length-20000)
client_socket.write(data)
@_sessions[mesg.session_uuid] = session
console_socket.on 'end', () =>
winston.debug("console session #{mesg.session_uuid} ended")
session = @_sessions[mesg.session_uuid]
if session?
session.status = 'done'
client_socket.end()
# Return object that describes status of all Console sessions
info: (project_id) =>
obj = {}
for id, session of @_sessions
if session.project_id == project_id
obj[id] =
desc : session.desc
status : session.status
history_length : session.history.length
return obj
console_sessions = new ConsoleSessions()
###############################################
# Direct Sage socket session -- used internally in local hub, e.g., to assist CodeMirror editors...
###############################################
get_sage_socket = (cb) -> # cb(err, socket that is ready to use)
sage_socket = undefined
port = undefined
async.series([
(cb) =>
winston.debug("get sage server port")
get_port 'sage', (err, _port) =>
if err
cb(err); return
else
port = _port
cb()
(cb) =>
winston.debug("get and unlock socket")
misc_node.connect_to_locked_socket
port : port
token : secret_token
cb : (err, _socket) =>
if err
forget_port('sage')
winston.debug("unlock socket: _new_session: sage session denied connection: #{err}")
cb("_new_session: sage session denied connection: #{err}")
return
sage_socket = _socket
winston.debug("Successfully unlocked a sage session connection.")
cb()
(cb) =>
winston.debug("request sage session from server.")
misc_node.enable_mesg(sage_socket)
sage_socket.write_mesg('json', message.start_session(type:'sage'))
winston.debug("Waiting to read one JSON message back, which will describe the session....")
# TODO: couldn't this just hang forever :-(
sage_socket.once 'mesg', (type, desc) =>
winston.debug("Got message back from Sage server: #{json(desc)}")
sage_socket.pid = desc.pid
cb()
], (err) -> cb(err, sage_socket))
###############################################
# Sage sessions
###############################################
plug = (s1, s2) ->
# Connect the sockets together.
s1.on 'data', (data) ->
activity() # record incoming activity (don't do this in other direction, since that shouldn't keep session alive)
s2.write(data)
s2.on 'data', (data) ->
s1.write(data)
## WARNING! I think this is no longer used! It was used for my first (few)
## approaches to worksheets.
class SageSessions
constructor: () ->
@_sessions = {}
session_exists: (session_uuid) =>
return @_sessions[session_uuid]?
terminate_session: (session_uuid, cb) =>
S = @_sessions[session_uuid]
if not S?
cb()
else
winston.debug("terminate sage session -- STUB!")
cb()
update_session_status: (session) =>
# Check if the process corresponding to the given session is
# *actually* running/healthy (?). Just because the socket hasn't sent
# an "end" doesn't mean anything.
try
process.kill(session.desc.pid, 0)
# process is running -- leave status as is.
catch e
# process is not running
session.status = 'done'
get_session: (uuid) =>
session = @_sessions[uuid]
if session?
@update_session_status(session)
return session
# Connect to (if 'running'), restart (if 'dead'), or create (if
# non-existing) the Sage session with mesg.session_uuid.
connect: (client_socket, mesg) =>
session = @get_session mesg.session_uuid
if session? and session.status == 'running'
winston.debug("sage sessions: connect to the running session with id #{mesg.session_uuid}")
client_socket.write_mesg('json', session.desc)
plug(client_socket, session.socket)
session.clients.push(client_socket)
else
winston.debug("make a connection to a new sage session.")
get_port 'sage', (err, port) =>
winston.debug("Got sage server port = #{port}")
if err
winston.debug("can't determine sage server port; probably sage server not running")
client_socket.write_mesg('json', message.error(id:mesg.id, error:"problem determining port of sage server."))
else
@_new_session(client_socket, mesg, port)
_new_session: (client_socket, mesg, port, retries) =>
winston.debug("_new_session: creating new sage session (retries=#{retries})")
# Connect to port, send mesg, then hook sockets together.
misc_node.connect_to_locked_socket
port : port
token : secret_token
cb : (err, sage_socket) =>
if err
winston.debug("_new_session: sage session denied connection: #{err}")
forget_port('sage')
if not retries? or retries <= 5
if not retries?
retries = 1
else
retries += 1
try_again = () =>
@_new_session(client_socket, mesg, port, retries)
setTimeout(try_again, 1000)
else
# give up.
client_socket.write_mesg('json', message.error(id:mesg.id, error:"local_hub -- Problem connecting to Sage server. -- #{err}"))
return
else
winston.debug("Successfully unlocked a sage session connection.")
winston.debug("Next, request a Sage session from sage_server.")
misc_node.enable_mesg(sage_socket)
sage_socket.write_mesg('json', message.start_session(type:'sage'))
winston.debug("Waiting to read one JSON message back, which will describe the session.")
sage_socket.once 'mesg', (type, desc) =>
winston.debug("Got message back from Sage server: #{json(desc)}")
client_socket.write_mesg('json', desc)
plug(client_socket, sage_socket)
# Finally, this socket is now connected to a sage server and ready to execute code.
@_sessions[mesg.session_uuid] =
socket : sage_socket
desc : desc
status : 'running'
clients : [client_socket]
project_id : mesg.project_id
sage_socket.on 'end', () =>
# this is *NOT* dependable, since a segfaulted process -- and sage does that -- might
# not send a FIN.
winston.debug("sage_socket: session #{mesg.session_uuid} terminated.")
session = @_sessions[mesg.session_uuid]
# TODO: should we close client_socket here?
if session?
winston.debug("sage_socket: setting status of session #{mesg.session_uuid} to terminated.")
session.status = 'done'
# Return object that describes status of all Sage sessions
info: (project_id) =>
obj = {}
for id, session of @_sessions
if session.project_id == project_id
obj[id] =
desc : session.desc
status : session.status
return obj
sage_sessions = new SageSessions()
############################################################################
#
# Differentially-Synchronized document editing sessions
#
# Here's a map YOU ARE HERE
# |
# [client]s.. ---> [hub] ---> [local hub] <--- [hub] <--- [client]s...
# |
# \|/
# [a file on disk]
#
#############################################################################
# The "live upstream content" of DiffSyncFile_client is the actual file on disk.
# # TODO: when applying diffs, we could use that the file is random access. This is not done yet!
class DiffSyncFile_server extends diffsync.DiffSync
constructor:(@cm_session, cb) ->
@path = @cm_session.path
@_backup_file = misc.meta_file(@path, 'backup')
# check for need to save a backup every this many milliseconds
@_autosave = setInterval(@write_backup, 10000)
# We prefer the backup file only if it both (1) exists, and
# (2) is *newer* than the master file. This is because some
# other editing program could have edited the master, not
# knowing about the backup, in which case it makes more sense
# to just go with the master.
no_master = undefined
stats_path = undefined
no_backup = undefined
stats_backup = undefined
file = undefined
async.series([
(cb) =>
fs.stat @path, (_no_master, _stats_path) =>
no_master = _no_master
stats_path = _stats_path
cb()
(cb) =>
fs.stat @_backup_file, (_no_backup, _stats_backup) =>
no_backup = _no_backup
stats_backup = _stats_backup
cb()
(cb) =>
if no_backup and no_master
# neither exist -- create
file = @path
misc_node.ensure_containing_directory_exists @path, (err) =>
if err
cb(err)
else
fs.open file, 'w', (err, fd) =>
if err
cb(err)
else
fs.close fd, cb
else if no_backup # no backup file -- always use master
file = @path
cb()
else if no_master # no master file but there is a backup file -- use backup
file = @_backup_file
cb()
else
# both master and backup exist
if stats_path.mtime.getTime() >= stats_backup.mtime.getTime()
# master is newer
file = @path
else
# backup is newer
file = @_backup_file
cb()
(cb) =>
fs.readFile file, (err, data) =>
if err
cb(err); return
# NOTE: we immediately delete \r's since the client editor (Codemirror) immediately deletes them
# on editor creation; if we don't delete them, all sync attempts fail and hell is unleashed.
@init(doc:data.toString().replace(/\r/g,''), id:"file_server")
# winston.debug("got new file contents = '#{@live}'")
@_start_watching_file()
cb(err)
], (err) => cb(err, @live))
kill: () =>
if @_autosave?
clearInterval(@_autosave)
# be sure to clean this up, or -- after 11 times -- it will suddenly be impossible for
# the user to open a file without restarting their project server! (NOT GOOD)
fs.unwatchFile(@path, @_watcher)
_watcher: (event) =>
winston.debug("watch: file '#{@path}' modified.")
if not @_do_watch
winston.debug("watch: skipping read because watching is off.")
return
@_stop_watching_file()
fs.readFile @path, (err, data) =>
if err
@_start_watching_file()
else
@live = data.toString().replace(/\r/g,'') # NOTE: we immediately delete \r's (see above).
@cm_session.sync_filesystem (err) =>
@_start_watching_file()
_start_watching_file: () =>
if @_do_watch?
@_do_watch = true
return
@_do_watch = true
winston.debug("watching #{@path}")
fs.watchFile(@path, @_watcher)
_stop_watching_file: () =>
@_do_watch = false
# NOTE: I tried using fs.watch as below, but *DAMN* -- even on
# Linux 12.10 -- fs.watch in Node.JS totally SUCKS. It led to
# file corruption, weird flakiness and errors, etc. fs.watchFile
# above, on the other hand, is great for my needs (which are not
# for immediate sync).
# _start_watching_file0: () =>
# winston.debug("(re)start watching...")
# if @_fs_watcher?
# @_stop_watching_file()
# try
# @_fs_watcher = fs.watch(@path, @_watcher)
# catch e
# setInterval(@_start_watching_file, 15000)
# winston.debug("WARNING: failed to start watching '#{@path}' -- will try later -- #{e}")
# _stop_watching_file0: () =>
# if @_fs_watcher?
# @_fs_watcher.close()
# delete @_fs_watcher
snapshot: (cb) => # cb(err, snapshot of live document)
cb(false, @live)
_apply_edits_to_live: (edits, cb) =>
if edits.length == 0
cb(); return
@_apply_edits edits, @live, (err, result) =>
if err
cb(err)
else
if result == @live
cb() # nothing to do
else
@live = result
@write_to_disk(cb)
write_to_disk: (cb) =>
@_stop_watching_file()
ensure_containing_directory_exists @path, (err) =>
if err
cb?(err); return
fs.writeFile @path, @live, (err) =>
@_start_watching_file()
if not err
fs.exists @_backup_file, (exists) =>
fs.unlink(@_backup_file)
cb?(err)
write_backup: (cb) =>
if @cm_session.content != @_last_backup
x = @cm_session.content
fs.writeFile @_backup_file, x, (err) =>
if not err
@_last_backup = x
cb?(err)
# The live content of DiffSyncFile_client is our in-memory buffer.
class DiffSyncFile_client extends diffsync.DiffSync
constructor:(@server) ->
super(doc:@server.live, id:"file_client")
# Connect the two together
@connect(@server)
@server.connect(@)
# The CodeMirrorDiffSyncHub class represents a global hub viewed as a
# remote client for this local hub. There may be dozens of global
# hubs connected to this single local hub, and these are the only
# clients a local hub can have. The local hub has no upstream server,
# except the on-disk file itself.
class CodeMirrorDiffSyncHub
constructor : (@socket, @session_uuid) ->
write_mesg: (event, obj) =>
if not obj?
obj = {}
obj.session_uuid = @session_uuid
@socket.write_mesg 'json', message['codemirror_' + event](obj)
recv_edits : (edit_stack, last_version_ack, cb) =>
@write_mesg 'diffsync',
id : @current_mesg_id
edit_stack : edit_stack
last_version_ack : last_version_ack
cb?()
sync_ready: () =>
@write_mesg('diffsync_ready')
class CodeMirrorSession
constructor: (mesg, cb) ->
@path = mesg.path
@session_uuid = mesg.session_uuid
@_sage_output_cb = {}
@_sage_output_to_input_id = {}
# The downstream clients of this local hub -- these are global hubs
@diffsync_clients = {}
async.series([
(cb) =>
# if File doesn't exist, try to create it.
fs.exists @path, (exists) =>
if exists
cb()
else
fs.open @path,'w', (err, fd) =>
if err
cb(err)
else
fs.close(fd, cb)
(cb) =>
misc_node.is_file_readonly
path : @path
cb : (err, readonly) =>
@readonly = readonly
cb(err)
(cb) =>
# If this is a non-readonly sagews file, create corresponding sage session.
if not @readonly and misc.filename_extension(@path) == 'sagews'
@process_new_content = @sage_update
@sage_socket(cb)
else
cb()
(cb) =>
# The *actual* file on disk. It's important to create this
# after successfully getting the sage socket, since if we fail to
# get the sage socket we end up creating too many fs.watch's on this file...
@diffsync_fileserver = new DiffSyncFile_server @, (err, content) =>
if err
cb(err); return
@content = content
@diffsync_fileclient = new DiffSyncFile_client(@diffsync_fileserver)
cb()
], (err) => cb?(err, @))
##############################
# Sage execution related code
##############################
sage_socket: (cb) => # cb(err, socket)
if @_sage_socket?
try
process.kill(@_sage_socket.pid, 0)
# process is still running fine
cb(false, @_sage_socket)
return
catch e
# sage process is dead.
@_sage_socket = undefined
@sage_update(kill:true)
winston.debug("sage_socket: Opening a Sage session.")
# Ensure that no cells appear to be running. This is important
# because the worksheet file that we just loaded could have had some
# markup that cells are running.
@sage_update(kill:true)
# Connect to the local Sage server.
get_sage_socket (err, socket) =>
if err
winston.debug("sage_socket: fail -- #{err}.")
cb(err)
else
winston.debug("sage_socket: successfully opened a Sage session for worksheet '#{@path}'")
@_sage_socket = socket
# Set path to be the same as the file.
mesg = message.execute_code
id : misc.uuid()
code : "os.chdir(salvus.data['path'])"
data : {path: misc.path_split(@path).head}
preparse : false
socket.write_mesg('json', mesg)
socket.on 'end', () =>
@_sage_socket = undefined
winston.debug("codemirror session #{@session_uuid} sage socket terminated.")
socket.on 'mesg', (type, mesg) =>
#winston.debug("sage session: received message #{type}, #{misc.to_json(mesg)}")
switch type
when 'blob'
sha1 = mesg.uuid
if @diffsync_clients.length == 0
error = 'no global hubs are connected to the local hub, so nowhere to send file'
winston.debug("codemirror session: got blob from sage session -- #{error}")
resp = message.save_blob
error : error
sha1 : sha1
socket.write_mesg('json', resp)
else
winston.debug("codemirror session: got blob from sage session -- forwarding to a random hub")
hub = misc.random_choice_from_obj(@diffsync_clients)
id = hub[0]; ds_client = hub[1]
ds_client.remote.socket.write_mesg('blob', mesg)
receive_save_blob_message
sha1 : sha1
cb : (resp) -> socket.write_mesg('json', resp)
## DEBUG -- for testing purposes -- simulate the response message
## handle_save_blob_message(message.save_blob(sha1:sha1,ttl:1000))
when 'json'
# First check for callbacks (e.g., used in interact and things where the
# browser directly asks to evaluate code in this session).
c = @_sage_output_cb[mesg.id]
if c?
c(mesg)
if mesg.done
delete @_sage_output_cb[mesg.id]
return
# Handle code execution in browser messages
if mesg.event == 'execute_javascript'
# winston.debug("got execute_javascript message from sage session #{json(mesg)}")
# Wrap and forward it on as a broadcast message.
mesg.session_uuid = @session_uuid
bcast = message.codemirror_bcast
session_uuid : @session_uuid
mesg : mesg
@client_bcast(undefined, bcast)
return
# Finally, handle output messages
m = {}
for x, y of mesg
if x != 'id' and x != 'event' # the event is always "output"
if x == 'done' # don't bother with done=false
if y
m[x] = y
else
m[x] = y
#winston.debug("sage --> local_hub: '#{json(mesg)}'")
before = @content
@sage_output_mesg(mesg.id, m)
if before != @content
@_set_content_and_sync()
# Submit all auto cells to be evaluated.
@sage_update(auto:true)
cb(false, @_sage_socket)
_set_content_and_sync: () =>
@set_content(@content)
# Suggest to all connected clients to sync.
for id, ds_client of @diffsync_clients
ds_client.remote.sync_ready()
sage_execute_cell: (id) =>
winston.debug("exec request for cell with id #{id}")
@sage_remove_cell_flag(id, diffsync.FLAGS.execute)
{code, output_id} = @sage_initialize_cell_for_execute(id)
winston.debug("exec code '#{code}'; output id='#{output_id}'")
#if diffsync.FLAGS.auto in @sage_get_cell_flagstring(id) and 'auto' not in code
#@sage_remove_cell_flag(id, diffsync.FLAGS.auto)
@set_content(@content)
if code != ""
@_sage_output_to_input_id[output_id] = id
winston.debug("start running -- #{id}")
# Change the cell to "running" mode - this doesn't generate output, so we must explicit force clients
# to sync.
@sage_set_cell_flag(id, diffsync.FLAGS.running)
@_set_content_and_sync()
@sage_socket (err, socket) =>
if err
winston.debug("Error getting sage socket: #{err}")
@sage_output_mesg(output_id, {stderr: "Error getting sage socket (unable to execute code): #{err}"})
@sage_remove_cell_flag(id, diffsync.FLAGS.running)
return
winston.debug("Sending execute message to sage socket.")
socket.write_mesg('json',
message.execute_code
id : output_id
code : code
preparse : true
)
# Execute code in the Sage session associated to this sync'd editor session
sage_execute_code: (client_socket, mesg) =>
#winston.debug("sage_execute_code '#{misc.to_json(mesg)}")
@_sage_output_cb[mesg.id] = (resp) =>
#winston.debug("sage_execute_code -- got output: #{misc.to_json(resp)}")
client_socket.write_mesg('json', resp)
@sage_socket (err, socket) =>
#winston.debug("sage_execute_code: #{misc.to_json(err)}, #{socket}")
if err
#winston.debug("Error getting sage socket: #{err}")
resp = message.output(stderr: "Error getting sage socket (unable to execute code): #{err}", done:true)
client_socket.write_mesg('json', resp)
else
#winston.debug("sage_execute_code: writing request message -- #{misc.to_json(mesg)}")
mesg.event = 'execute_code' # event that sage session understands
socket.write_mesg('json', mesg)
sage_call: (opts) =>
opts = defaults opts,
mesg : required
cb : undefined
f = (resp) =>
opts.cb?(false, resp)
delete @_sage_output_cb[opts.mesg.id] # exactly one response
@sage_socket (err, socket) =>
if err
opts.cb?("error getting sage socket -- #{err}")
else
@_sage_output_cb[opts.mesg.id] = f
socket.write_mesg('json', opts.mesg)
sage_introspect:(client_socket, mesg) =>
mesg.event = 'introspect' # event that sage session understand
@sage_call
mesg : mesg
cb : (err, resp) =>
if err
resp = message.error(error:"Error getting sage socket (unable to introspect): #{err}")
client_socket.write_mesg('json', resp)
else
client_socket.write_mesg('json', resp)
send_signal_to_sage_session: (client_socket, mesg) =>
if @_sage_socket?
process_kill(@_sage_socket.pid, mesg.signal)
if mesg.id? and client_socket?
client_socket.write_mesg('json', message.signal_sent(id:mesg.id))
sage_update: (opts={}) =>
opts = defaults opts,
kill : false # if true, just remove all running flags.
auto : false # if true, run all cells that have the auto flag set
if not @content? # document not initialized
return
# Here we:
# - scan the string @content for execution requests.
# - also, if we see a cell UUID that we've seen already, we randomly generate
# a new cell UUID; clients can annoyingly generate non-unique UUID's (e.g., via
# cut and paste) so we fix that.
winston.debug("sage_update: opts=#{misc.to_json(opts)}")
i = 0
prev_ids = {}
while true
i = @content.indexOf(diffsync.MARKERS.cell, i)
if i == -1
break
j = @content.indexOf(diffsync.MARKERS.cell, i+1)
if j == -1
break # corrupt and is the last one, so not a problem.
id = @content.slice(i+1,i+37)
if prev_ids[id]?
# oops, repeated "unique" id, so fix it.
id = uuid.v4()
@content = @content.slice(0,i+1) + id + @content.slice(i+37)
# Also, if 'r' in the flags for this cell, remove it since it
# can't possibly be already running (given the repeat).
flags = @content.slice(i+37, j)
if diffsync.FLAGS.running in flags
new_flags = ''
for t in flags
if t != diffsync.FLAGS.running
new_flags += t
@content = @content.slice(0,i+37) + new_flags + @content.slice(j)
prev_ids[id] = true
flags = @content.slice(i+37, j)
if opts.kill
new_flags = ''
for t in flags
if t != diffsync.FLAGS.running
new_flags += t
@content = @content.slice(0,i+37) + new_flags + @content.slice(j)
else
if diffsync.FLAGS.execute in flags
@sage_execute_cell(id)
else if opts.auto and diffsync.FLAGS.auto in flags
@sage_remove_cell_flag(id, diffsync.FLAGS.auto)
@sage_execute_cell(id)
i = j + 1
sage_output_mesg: (output_id, mesg) =>
cell_id = @_sage_output_to_input_id[output_id]
#winston.debug("output_id=#{output_id}; cell_id=#{cell_id}; map=#{misc.to_json(@_sage_output_to_input_id)}")
if mesg.hide?
# Hide a single component (also, do not record the message itself in the
# document, just its impact).
flag = undefined
if mesg.hide == 'input'
flag = diffsync.FLAGS.hide_input
else if mesg.hide == 'output'
flag = diffsync.FLAGS.hide_output
if flag?
@sage_set_cell_flag(cell_id, flag)
else
winston.debug("invalid hide component: '#{mesg.hide}'")
delete mesg.hide
if mesg.show?
# Show a single component of cell.
flag = undefined
if mesg.show == 'input'
flag = diffsync.FLAGS.hide_input
else if mesg.show == 'output'
flag = diffsync.FLAGS.hide_output
if flag?
@sage_remove_cell_flag(cell_id, flag)
else
winston.debug("invalid hide component: '#{mesg.hide}'")
delete mesg.show
if mesg.auto?
# set or unset whether or not cell is automatically executed on startup of worksheet
if mesg.auto
@sage_set_cell_flag(cell_id, diffsync.FLAGS.auto)
else
@sage_remove_cell_flag(cell_id, diffsync.FLAGS.auto)
if mesg.done? and mesg.done and cell_id?
@sage_remove_cell_flag(cell_id, diffsync.FLAGS.running)
delete @_sage_output_to_input_id[output_id]
delete mesg.done # not needed
if /^\s\s*/.test(mesg.stdout) # final whitespace not needed for proper display
delete mesg.stdout
if /^\s\s*/.test(mesg.stderr)
delete mesg.stderr
if misc.is_empty_object(mesg)
return
if mesg.once? and mesg.once
# only javascript is define once=True
if mesg.javascript?
msg = message.execute_javascript
session_uuid : @session_uuid
code : mesg.javascript.code
coffeescript : mesg.javascript.coffeescript
obj : mesg.obj
cell_id : cell_id
bcast = message.codemirror_bcast
session_uuid : @session_uuid
mesg : msg
@client_bcast(undefined, bcast)
return # once = do *not* want to record this message in the output stream.
i = @content.indexOf(diffsync.MARKERS.output + output_id)
if i == -1
# no such output cell anymore -- ignore (?) -- or we could make such a cell...?