This repository has been archived by the owner on Feb 17, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
460 lines (362 loc) · 15.9 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
"""
Client implementation
"""
import asyncio
import functools
import os
import random
import signal
import logging
import time
import srp
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from lib import files
from lib import sha256
from lib.protocol import BaseScsyncProtocol, EncryptionMode, ErrorType
class FileEventHandler(FileSystemEventHandler):
"""
File Event Handler
"""
def __init__(self, loop, path, protocol):
self.loop = loop
self.path = path
self.protocol = protocol
def relative_filepath(self, file):
"""
Return a filepath relative to the file dir for the given file.
"""
return files.relative_filepath(file, self.path)
def on_created(self, event):
if event.is_directory:
return
self.loop.call_soon_threadsafe(self.protocol.create_file,
self.relative_filepath(event.src_path))
def on_deleted(self, event):
if event.is_directory:
return
if self.relative_filepath(event.src_path) in self.protocol.expected_delete_calls:
self.protocol.remove_expected_delete_calls(
self.relative_filepath(event.src_path))
return
self.loop.call_soon_threadsafe(self.protocol.delete_file,
self.relative_filepath(event.src_path))
def on_modified(self, event):
if event.is_directory:
return
self.loop.call_soon_threadsafe(self.protocol.update_file,
self.relative_filepath(event.src_path))
def on_moved(self, event):
if event.is_directory:
return
self.loop.call_soon_threadsafe(self.protocol.move_file,
self.relative_filepath(event.src_path),
self.relative_filepath(event.dest_path))
class ClientScsyncProtocol(BaseScsyncProtocol):
"""
Client implementation of the scsync protocol
"""
def __init__(self, loop, path, packets_per_second, chunk_size, test, username, password):
print("Using user:", username)
print()
super().__init__(loop, path, packets_per_second, chunk_size, test)
print("Syncing path:", self.path)
print()
# Set fetch update interval
self.fetch_intercal = 10.0 # should be ~ 30s in production
self.pending_update_callback = None
self.username = username
self.authenticator = srp.User(
username, password, hash_alg=srp.SHA256, ng_type=srp.NG_2048)
_, self.seed = self.authenticator.start_authentication()
self.request_id = random.getrandbits(32)
self.session_id = None
# start file dir observer
event_handler = FileEventHandler(self.loop, self.path, self)
self.observer = Observer()
self.observer.schedule(event_handler, self.path, recursive=False)
self.observer.start()
# Socket State
def connection_made(self, transport):
super().connection_made(transport)
# workaround to make start() execute after
# loop.run_until_complete(connect) returned
self.loop.call_later(0.001, self.start)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
self.stop()
# UNIX Signals
def signal(self, signame) -> None:
"""
UNIX signal handler.
"""
print("Got signal %s: exit" % signame)
self.stop()
# State
def start(self) -> None:
"""
Start client protocol by sending a Client_Hello.
"""
# schedule resend (canceled if Server_Hello or Challenge is received)
callback_handle = self.loop.call_later(self.resend_delay, self.start)
self.pending_update_callback = callback_handle
self.send_client_hello(self.request_id, self.username, self.seed)
def update(self) -> None:
"""
Update client by sending a Client_Update_Request.
"""
# schedule resend (canceled if Current_Server_State received)
callback_handle = self.loop.call_later(self.resend_delay, self.update)
self.pending_update_callback = callback_handle
self.send_client_update_request(self.session_id, self.epoch)
def stop(self) -> None:
"""
Stop the client.
"""
# cancel all upload tasks
for upload in self.active_uploads.values():
upload[1].cancel()
# stop the file observer
if self.observer:
self.observer.stop()
# stop the event loop
self.loop.stop()
def __cancel_upload(self, filename, filehash, cancel_metadata=True) -> None:
if cancel_metadata:
self.cancel_resend(self.pending_metadata_callbacks, filename)
fileinfo = self.fileinfo.get(filename, None)
if fileinfo is not None and filehash == fileinfo['filehash']:
active_upload = self.active_uploads.get(filename, None)
if active_upload is not None:
active_upload[1].cancel()
# Local file handling
def remove_local_file(self, filename) -> None:
"""
Remove a local file from the file system
"""
# Remove the file from the file system
if os.path.isfile(self.path + filename.decode("utf-8")):
os.remove(self.path + filename.decode("utf-8"))
else:
logging.warning("Could not remove file \"%s\"", filename)
return
# Remove the file from the internal fileinfo dict
del self.fileinfo[filename]
print("Deleted file \"%s\"" % filename)
def rename_local_file(self, old_filename, new_filename) -> None:
"""
Rename a local file
"""
# Rename the file from the file system
if os.path.isfile(self.path + old_filename.decode("utf-8")):
os.renames(self.path + old_filename.decode("utf-8"),
self.path + new_filename.decode("utf-8"))
else:
logging.warning("Could not rename \"%s\" to \"%s\"",
old_filename, new_filename)
return
# Remove the old reference from the internal fileinfo dict and add a
# new one for the new_filename
filehash = self.fileinfo[old_filename]
del self.fileinfo[old_filename]
self.fileinfo[new_filename] = filehash
print("Renamed/Moved file \"%s\" to \"%s\"" %
(old_filename, new_filename))
# Packet Handlers
def handle_error(self, session_id, data, addr) -> None:
logging.info('received Error from %s', addr)
valid, filehash, filename, error_type, error_id, description = self.unpack_error(
data)
if not valid:
self.handle_invalid_packet(data, addr)
return
logging.error('%s [%s]: %s %s', filename, sha256.hex(filehash),
error_type, description)
if error_type == ErrorType.File_Hash_Error:
print('reuploading file \"%s\"' % filename.decode('utf8'))
self.loop.call_soon(self.upload_file, filename, addr)
elif error_type in [ErrorType.Out_Of_Memory, ErrorType.Conflict, ErrorType.Upload_Failed]:
self.__cancel_upload(filename, filehash)
else:
logging.error('unknown error')
self.send_ack_error(session_id, error_id)
def handle_challenge(self, data, addr) -> None:
valid, request_id, salt, server_seed, token = self.unpack_challenge(
data)
if not valid or request_id != self.request_id:
self.handle_invalid_packet(data, addr)
return
proof = self.authenticator.process_challenge(salt, server_seed)
if proof is None:
return
# cancel resend
if not self.cancel_resend(self.pending_update_callback, None):
return
# schedule resend (canceled if Server_Hello or Challenge is received) to
# start over if the handshake fails
callback_handle = self.loop.call_later(self.resend_delay, self.start)
self.pending_update_callback = callback_handle
encryptor = self.get_encryptor(
EncryptionMode.AES_256_GCM, self.authenticator.K)
if not encryptor:
logging.debug(
"invalid encryption mode in request %u. Abort.", request_id)
return
# TODO: use server-assigned session ID instead
self.session_id = request_id
self.sessions[request_id] = {
'encryptor': encryptor,
'nonce': 0,
'user': self.username
}
self.send_challenge_response(
request_id, proof, self.seed, self.username.encode('utf-8'), token)
def handle_current_server_state(self, session_id, data, addr) -> None:
valid, epoch, remote_files = self.unpack_current_server_state(data)
if not valid:
self.handle_invalid_packet(data, addr)
return
# cancel resend
if not self.cancel_resend(self.pending_update_callback, None):
return
# nothing new
if self.epoch == epoch:
# Call update() repeatedly to get an update of the files on the server
# and react accordingly
self.loop.call_later(self.fetch_intercal, self.update)
return
self.epoch = epoch
# build file dir diff
for filename, fileinfo in self.fileinfo.items():
if filename not in remote_files:
new_file_name = None
for remote_filename, remote_filehash in remote_files.items():
if remote_filehash == fileinfo['filehash']:
new_file_name = remote_filename
if new_file_name is not None:
# If the local file hash can be found in the remote files but the name is different,
# we assume it has been renamed by an other client -->
# Rename local copy
self.rename_local_file(filename, new_file_name)
else:
# If local file is not in remote files and there is no suiting hash (possible rename)
# we assume it has been deleted by an other client -->
# Remove local copy
self.remove_local_file(filename)
for remote_filename, remote_filehash in remote_files.items():
if remote_filename not in self.fileinfo.keys() or self.fileinfo[remote_filename]['filehash'] != remote_filehash:
# If after renaming and removing files that might have changed on the server
# we check of any of the remote files still is not present on the client or is
# present but has a different hash and if so we assume that the file must be
# requested from the server using a request_file packet
self.loop.call_soon(self.request_file,
remote_filename, remote_filehash, addr)
# Call update() repeatedly to get an update of the files on the server
# and react accordingly
self.loop.call_later(self.fetch_intercal, self.update)
def handle_ack_delete(self, session_id, data, addr) -> None:
valid, filehash, filename = self.unpack_ack_delete(data)
if not valid:
self.handle_invalid_packet(data, addr)
return
fileinfo = self.fileinfo.get(filename, None)
if fileinfo is None or fileinfo['filehash'] != filehash:
self.handle_invalid_packet(data, addr)
return
# cancel resend
if not self.cancel_resend(self.pending_delete_callbacks, filename):
return
del self.fileinfo[filename]
print("Deleted file \"%s\" was acknowledged" % filename)
def handle_ack_rename(self, session_id, data, addr) -> None:
valid, filehash, old_filename, new_filename = self.unpack_ack_rename(
data)
if not valid:
self.handle_invalid_packet(data, addr)
return
fileinfo = self.fileinfo.get(old_filename, None)
if fileinfo is None or fileinfo['filehash'] != filehash:
self.handle_invalid_packet(data, addr)
return
del self.fileinfo[old_filename]
self.fileinfo[new_filename] = fileinfo
# cancel resend
if not self.cancel_resend(self.pending_rename_callbacks, old_filename):
return
print("Renamed/Moved file \"%s\" to \"%s\" was acknowledged" %
(old_filename, new_filename))
# file sync methods
def create_file(self, filename) -> None:
self.upload_file(self.session_id, filename)
def delete_file(self, filename) -> None:
"""
Delete the given file from the server.
"""
# Check if the file was not deleted by the client program itself.
if not filename in self.fileinfo.keys():
return
print("Deleted file \"%s\"" % filename)
filehash = self.fileinfo[filename]["filehash"]
# schedule resend (canceled if ack'ed)
callback_handle = self.loop.call_later(
self.resend_delay, self.delete_file, filename)
self.pending_delete_callbacks[filename] = callback_handle
self.send_file_delete(self.session_id, filehash, filename)
def update_file(self, filename) -> None:
"""
Update the given file on the server by uploading the new content.
"""
self.fileinfo[filename] = self.get_fileinfo(filename.decode('utf8'))
# schedule resend (canceled if ack'ed) TODO
'''callback_handle = self.loop.call_later(
self.resend_delay, self.update_file, filename)
self.pending_delete_callbacks[filename] = callback_handle'''
self.send_file_update_request(
self.session_id, filename, self.fileinfo[filename])
def request_file(self, filename, filehash, addr) -> None:
"""
Request a given file on the server.
"""
logging.debug("Request file \"%s\" with hash: %s", filename, filehash)
self.send_client_file_request(
self.session_id, filename, filehash, addr)
def move_file(self, old_filename, new_filename) -> None:
"""
Move a file on the server by changing its path.
"""
# Check if the file was not renamed/moved by the client program itself.
if not old_filename in self.fileinfo.keys():
return
print("Renamed/Moved file \"%s\" to \"%s\"" %
(old_filename, new_filename))
filehash = self.fileinfo[old_filename]["filehash"]
# schedule resend (canceled if ack'ed)
callback_handle = self.loop.call_later(
self.resend_delay, self.move_file, old_filename, new_filename)
self.pending_rename_callbacks[old_filename] = callback_handle
self.send_file_rename(self.session_id, filehash,
old_filename, new_filename)
def run(args):
"""
Start running as a client.
"""
loop = asyncio.get_event_loop()
# create UDP socket and start event loop listening to it
server_address = (args.host, args.port)
print('Trying to sync with {}:{}\n'.format(*server_address))
connect = loop.create_datagram_endpoint(
lambda: ClientScsyncProtocol(
loop, args.path, args.cc, args.chunk_size, args.test, args.user, args.password),
remote_addr=server_address)
transport, protocol = loop.run_until_complete(connect)
if not args.test:
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
functools.partial(protocol.signal, signame))
print("Event loop running forever, press Ctrl+C to interrupt.")
print("PID %s: send SIGINT or SIGTERM to exit.\n\n" % os.getpid())
try:
loop.run_forever()
finally:
transport.close()
loop.close()