This repository has been archived by the owner on Apr 9, 2024. It is now read-only.
forked from elifesciences/threadbare
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
897 lines (723 loc) · 33.9 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
import contextlib
import pytest
import os, shutil, tempfile
from os.path import join, basename
from functools import partial
from io import BytesIO, StringIO
from threadbare import execute, state, common, operations
from threadbare.state import settings
from threadbare.operations import (
remote,
remote_file_exists,
remote_sudo,
local,
download,
upload,
single_command,
lcd,
rcd,
hide,
)
import logging
logging.basicConfig()
LOG = logging.getLogger(__name__)
HOST = "127.0.0.1"
PORT = os.environ.get("THREADBARE_TEST_PORT")
USER = os.environ.get("THREADBARE_TEST_USER")
KEY = os.environ.get("THREADBARE_TEST_PUBKEY", None)
TRANSFER_PROTOCOL = os.environ.get("THREADBARE_TEST_TRANSFER_PROTOCOL", "scp")
_help_text = """the environment variables below must be defined before executing this script:
THREADBARE_TEST_PORT=
THREADBARE_TEST_USER=
THREADBARE_TEST_PUBKEY=
THREADBARE_TEST_PROTOCOL=
THREADBARE_TEST_TRANSFER_PROTOCOL=
THREADBARE_TEST_PORT must be an integer.
THREADBARE_TEST_TRANSFER_PROTOCOL must be either 'scp', 'sftp' or 'rsync'
It's assumed the dummy sshd server is running and that the host is `localhost`.
"""
assert (HOST and PORT and USER) and common.isint(PORT), _help_text
# prefixed with an underscore so pytest doesn't pick it up
_test_settings = partial(
settings,
user=USER,
port=int(PORT),
host_string=HOST,
key_filename=KEY,
transfer_protocol=TRANSFER_PROTOCOL,
)
def _env_fixture(prefix):
"""creates a temporary directory with three files in it, 'small-file', 'medium-file' and 'large-file'.
the paths to the directory and files are yielded as a map and then all is removed afterwards.
"""
@contextlib.contextmanager
def wrapper():
tempdir = tempfile.mkdtemp(prefix="threadbare-" + prefix)
# create some empty files of specific sizes
path_map = {
"small-file": join(tempdir, "small-file.temp"),
"medium-file": join(tempdir, "medium-file.temp"),
# "large-file": join(tempdir, "large-file.temp") # unused
}
file_size_map = {
"small-file": "1KiB",
"medium-file": "1MiB",
"large-file": "25MiB",
}
for path_name, path in path_map.items():
file_size = file_size_map[path_name]
local("fallocate -l %s %s" % (file_size, path))
try:
yield {"temp-dir": tempdir, "temp-files": path_map}
finally:
# permissions on temp dir may have changed. make sure we can still remove it.
local('chown %s:%s -R "%s"' % (USER, USER, tempdir), use_sudo=True)
shutil.rmtree(tempdir)
return wrapper
def _empty_env_fixture(prefix):
"creates a temporary directory with no files in it, yields the directory name and cleans itself up afterwards."
@contextlib.contextmanager
def wrapper():
tempdir = tempfile.mkdtemp(prefix="threadbare-" + prefix)
try:
yield {"temp-dir": tempdir}
finally:
# permissions on temp dir may have changed. make sure we can still remove it.
local('chown %s:%s -R "%s"' % (USER, USER, tempdir), use_sudo=True)
shutil.rmtree(tempdir)
return wrapper
# pytest fixtures turned out to be too brittle and too magical. I'd rather nested context managers.
# remote_env = pytest.fixture(_env("remote"))
# local_env = pytest.fixture(_env("local"))
# empty_local_env = pytest.fixture(_empty_env("local"))
# empty_remote_env = pytest.fixture(_empty_env("remote"))
# remote and local are the same but lets pretend they're not.
empty_local_fixture = _empty_env_fixture("local")
empty_remote_fixture = _empty_env_fixture("remote")
local_fixture = _env_fixture("local")
remote_fixture = _env_fixture("remote")
# local tests
# see `tests/test_state.py` and `tests/test_operations.py` for more examples
def test_nest_some_settings():
"demonstrates how settings accumulate"
with settings(foo="bar"):
with settings(bar="baz"):
with settings(baz="bup"):
LOG.debug(
"after three nestings I have the cumulate state: %s" % state.ENV
)
assert state.ENV == {"foo": "bar", "bar": "baz", "baz": "bup"}
def test_run_a_local_command():
"run a simple local command"
result = local("echo hello, world!")
assert result["succeeded"]
def test_run_a_local_command_with_separate_streams():
"run a simple local command but capture the output"
result = local("echo hello, world!", capture=True)
assert result["succeeded"]
def test_run_a_local_command_in_a_different_dir():
"switch to a different local directory to run a command"
with lcd("/tmp"):
result = local("pwd", capture=True)
assert result["succeeded"]
assert result["stdout"] == ["/tmp"]
def test_run_a_local_command_but_hide_output():
"presumably for side effects"
with hide():
result = local("cat /etc/passwd", capture=False)
# (nothing should be emitted)
assert result["succeeded"]
assert result["stdout"] == []
assert result["stderr"] == []
def test_run_many_local_commands_serially():
"run a list of commands serially. `serial` exists to complement `parallel`"
command_list = [
"echo all",
"echo these commands",
"echo are executed",
"echo in serially",
]
def myfn():
return local(state.ENV["cmd"], capture=True)
results = execute.execute(myfn, param_key="cmd", param_values=command_list)
assert len(results) == len(command_list)
assert results[-2]["stdout"] == ["are executed"]
def test_run_many_local_commands_in_parallel():
"run a set of commands in parallel using Python's multiprocessing"
command_list = [
"echo all",
"echo these commands",
"echo are executed",
"echo in parallel",
]
@execute.parallel
def myfn():
return local(state.ENV["cmd"], capture=True)
results = execute.execute(myfn, param_key="cmd", param_values=command_list)
assert len(results) == len(command_list)
assert results[-2]["stdout"] == ["are executed"]
# remote tests
# these assume the dummy sshd server (configured in `./tests-remote/sshd-server.sh`) is being
# run and that both local and remote are the same machine.
def test_run_a_remote_command():
"run a simple `remote` command"
with _test_settings(quiet=True):
result = remote(r'echo -e "\e[31mRed Text!!\e[0m"')
assert result["succeeded"]
def test_run_a_remote_command_but_hide_output():
"run a simple `remote` command but don't print anything"
with _test_settings():
with hide():
result = remote("echo hi!")
# (nothing should have been emitted)
assert result["succeeded"]
assert result["stdout"] == ["hi!"]
def test_run_a_remote_command_as_root():
"run a simple `remote` command as the root user"
with _test_settings():
result = remote_sudo("cd /root && echo tapdance in $(pwd)")
assert result["succeeded"]
def test_run_a_remote_command_in_a_different_dir():
"run a simple `remote` command in a different remote directory"
with remote_fixture() as remote_env:
with _test_settings():
remote_dir = remote_env["temp-dir"]
with rcd(remote_dir):
result = remote("pwd")
assert result["succeeded"]
assert [remote_dir] == result["stdout"]
def test_run_a_remote_command_with_separate_streams():
"run a simple `remote` command and capture stdout and stderr separately"
with _test_settings():
result = remote(
'echo "printed to standard out"; >&2 echo "printed to standard error"',
combine_stderr=False,
)
assert result["succeeded"]
assert ["printed to standard out"] == result["stdout"]
assert ["printed to standard error"] == result["stderr"]
def test_run_a_remote_command_with_shell_interpolation():
"run a simple `remote` command including shell variables"
with _test_settings(quiet=True):
result = remote('foo=baz; echo "bar? $foo!"')
assert result["succeeded"]
assert ["bar? baz!"] == result["stdout"]
result2 = remote('foo=baz; echo "bar? $foo!"', use_shell=False)
assert result2["succeeded"]
assert ["bar? baz!"] == result["stdout"]
def test_run_a_remote_command_non_zero_return_code():
"""`remote` commands, like `local` commands, will raise a RuntimeError if the command they execute fails.
the results of the command are still available via the `result` attribute on the exception object
"""
with _test_settings():
with pytest.raises(RuntimeError) as err:
remote("exit 123")
exc = err.value
assert exc.result["return_code"] == 123
assert exc.result["failed"]
assert not exc.result["succeeded"]
def test_run_a_remote_command_non_zero_custom_exit():
"""`remote` commands, like `local` commands, may raise a custom exception if the command they execute fails.
the results of the command are still available via the `result` attribute on the exception object
"""
with _test_settings():
with pytest.raises(ValueError) as err:
remote("exit 123", abort_exception=ValueError)
exc = err.value
assert exc.result["return_code"] == 123
assert exc.result["failed"]
assert not exc.result["succeeded"]
def test_run_a_remote_command_non_zero_return_code_swallow_error():
"`remote` commands, like `local` commands, can return the results of failed executions when `warn_only` is `True`"
with _test_settings(warn_only=True):
result = remote("exit 123")
assert result["return_code"] == 123
assert result["failed"]
assert not result["succeeded"]
def test_run_many_remote_commands():
"running many `remote` commands re-uses the established ssh session"
command_list = [
"echo all",
"echo these commands",
"echo share the same",
"echo ssh session",
]
with _test_settings():
for command in command_list:
result = remote(command)
assert result["succeeded"]
def test_run_many_remote_commands_singly():
"multiple commands can be concatenated into a single command"
command_list = [
"echo all",
"echo these commands",
"echo are executed",
"echo together",
]
with _test_settings():
result = remote(single_command(command_list))
assert result["succeeded"]
def test_run_many_remote_commands_serially():
"""run a list of `remote` commands serially. The `execute` module is aimed at
running commands in parallel.
Serial execution exists only as a sensible default and offers nothing extra."""
command_list = [
"echo all",
"echo these commands",
"echo are executed",
"echo serially and remotely",
]
def myfn():
return remote(state.ENV["cmd"], capture=True)
with _test_settings():
results = execute.execute(myfn, param_key="cmd", param_values=command_list)
assert len(results) == len(command_list)
assert results[-2]["stdout"] == ["are executed"]
def test_run_many_remote_commands_in_parallel():
"""run a list of `remote` commands in parallel.
`remote` commands run in parallel do not share a ssh connection.
the order of results can be guaranteed but not the order in which output is emitted
"""
command_list = [
"echo all",
"echo these commands",
"echo are executed",
"echo remotely and in parallel",
]
@execute.parallel
def myfn():
return remote(state.ENV["cmd"], capture=True)
with _test_settings(quiet=True):
results = execute.execute(myfn, param_key="cmd", param_values=command_list)
assert len(results) == len(command_list)
assert results[-2]["stdout"] == ["are executed"]
def test_remote_exceptions_in_parallel__raise_errors():
"""Remote commands that raise exceptions while executing in parallel are re-raised when encountered in the results."""
def workerfn():
with state.settings():
return remote("exit 1")
workerfn = execute.parallel(workerfn, pool_size=1)
with _test_settings():
expected = RuntimeError(
"remote() encountered an error (return code 1) while executing '/bin/bash -l -c \"exit 1\"'"
)
with pytest.raises(RuntimeError) as e:
execute.execute(workerfn)
assert str(expected) == str(e)
def test_remote_exceptions_in_parallel__swallow_errors():
"""Remote commands that raise exceptions while executing in parallel return the exception object when `raise_unhandled_errors` is `False`."""
def workerfn():
with state.settings():
return remote("exit 1")
workerfn = execute.parallel(workerfn, pool_size=1)
with _test_settings():
expected = RuntimeError(
"remote() encountered an error (return code 1) while executing '/bin/bash -l -c \"exit 1\"'"
)
result_list = execute.execute(workerfn, raise_unhandled_errors=False)
result = result_list[0]
assert str(expected) == str(result)
def test_check_remote_files():
"check that remote files can be found (or not)"
with remote_fixture() as remote_env:
with _test_settings():
file_that_exists = join(remote_env["temp-files"]["small-file"])
file_that_does_not_exist = join(remote_env["temp-dir"], "doesnot.exist")
assert remote_file_exists(file_that_exists)
assert not remote_file_exists(file_that_does_not_exist)
def _test_upload_and_download_a_file(transfer_protocol):
"""write a local file, upload it to the remote server, modify it remotely, download it, modify it locally,
assert it's contents are as expected"""
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings(transfer_protocol=transfer_protocol):
LOG.debug("modifying local file ...")
local_file_name = join(local_env["temp-dir"], "foo")
local('printf "foo" > %s' % local_file_name)
LOG.debug("uploading file ...")
remote_file_name = join(remote_env["temp-dir"], "foobar")
upload(local_file_name, remote_file_name)
# verify contents
assert remote_file_exists(remote_file_name)
assert remote("cat %s" % remote_file_name)["stdout"] == ["foo"]
LOG.debug("modifying remote file ...")
remote('printf "bar" >> %s' % remote_file_name)
# verify contents
assert remote("cat %s" % remote_file_name)["stdout"] == ["foobar"]
LOG.debug("downloading file ...")
new_local_file_name = join(local_env["temp-dir"], "foobarbaz")
download(remote_file_name, new_local_file_name)
# verify contents
with open(new_local_file_name, "r") as fh:
assert fh.read() == "foobar"
LOG.debug("modifying local file (again) ...")
local('printf "baz" >> %s' % new_local_file_name)
LOG.debug("testing local file ...")
with open(new_local_file_name, "r") as fh:
data = fh.read()
assert "foobarbaz" == data
def test_upload_and_download_a_file_coverage_bump():
"""tests uploading and downloading a file using all three transfer protocols.
this is covered more thoroughly in the `./project-tests.sh` script and is
just to bump test coverage."""
for transfer_protocol in ["scp", "sftp", "rsync"]:
_test_upload_and_download_a_file(transfer_protocol)
def test_upload_a_directory(): # you can't
"attempting to upload a directory raises an exception"
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
with pytest.raises(ValueError):
upload(local_env["temp-dir"], remote_env["temp-dir"])
def test_upload_to_extant_remote_file():
"the default policy is to overwrite files that exist."
with empty_local_fixture():
with remote_fixture() as remote_env:
with _test_settings():
payload = b"foo"
remote_file = remote_env["temp-files"]["small-file"]
# just to illustrate an overwrite *is* happening
assert remote_file_exists(remote_file)
upload(BytesIO(payload), remote_file)
result = remote('cat "%s"' % (remote_file,))
assert [payload.decode("utf-8")] == result["stdout"]
def test_upload_to_extant_remote_file_no_overwrite():
"the default policy of overwriting files can be disabled when `override` is set to `False`."
with empty_local_fixture():
with remote_fixture() as remote_env:
with _test_settings():
payload = b"foo"
remote_file = remote_env["temp-files"]["small-file"]
assert remote_file_exists(remote_file)
with pytest.raises(operations.NetworkError) as exc_info:
upload(BytesIO(payload), remote_file, overwrite=False)
expected_msg = (
"Remote file exists and 'overwrite' is set to 'False'. Refusing to write: %s"
% (remote_file,)
)
assert expected_msg == str(exc_info.value)
def test_upload_to_non_existant_remote_dir():
"intermediate non-existant directories in a remote path will be created."
with local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
non_existant_dir = "does/not/exist"
local_file = local_env["temp-files"]["small-file"]
remote_file_name = os.path.basename(local_file)
expected_remote_file = join(
remote_env["temp-dir"], non_existant_dir, remote_file_name
)
upload(local_file, expected_remote_file)
assert remote_file_exists(expected_remote_file)
def test_download_to_extant_local_file():
"the default policy is to overwrite files that exist."
with local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
local_file = local_env["temp-files"]["small-file"]
assert os.path.exists(local_file)
payload = "foo"
remote_file = join(remote_env["temp-dir"], "foo.file")
remote('printf %s > "%s"' % (payload, remote_file))
assert remote_file_exists(remote_file)
download(remote_file, local_file)
with open(local_file, "r") as fh:
result = fh.read()
assert payload == result
def test_download_to_extant_local_file_no_overwrite():
"the default policy of overwriting files can be disabled when `override` is set to `False`."
with local_fixture() as local_env:
with remote_fixture() as remote_env:
with _test_settings():
local_file = local_env["temp-files"]["small-file"]
remote_file = remote_env["temp-files"]["medium-file"]
with pytest.raises(operations.NetworkError) as exc_info:
download(remote_file, local_file, overwrite=False)
expected_msg = (
"Local file exists and 'overwrite' is set to 'False'. Refusing to write: %s"
% (local_file,)
)
assert expected_msg == str(exc_info.value)
def test_download_a_directory(): # you can't
"attempting to download a directory raises an exception."
# its possible as both parallel-ssh and paramiko use SFTP, but unsupported in threadbare.
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
# it becomes ambiguous if remote path is a file or a directory
remote_dir = remote_env["temp-dir"]
assert not remote_dir.endswith("/")
with pytest.raises(ValueError):
download(remote_dir, local_env["temp-dir"])
def test_download_an_obvious_directory(): # you can't
"""attempting to download an obvious directory (trailing slash /) raises an exception.
Its possible as both parallel-ssh and paramiko use SFTP, but not supported."""
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
# ensure we're dealing with an obvious directory
remote_dir = "%s/" % remote_env["temp-dir"].rstrip("/")
with pytest.raises(ValueError):
download(remote_dir, local_env["temp-dir"])
def test_download_a_file_to_a_directory():
"a file can be downloaded to a directory and the name of the remote file will be used as the destination file"
with empty_local_fixture() as local_env:
with remote_fixture() as remote_env:
with _test_settings():
local_dir = local_env["temp-dir"]
remote_file = remote_env["temp-files"]["small-file"]
expected_local_file = join(local_dir, basename(remote_file))
new_local_file = download(remote_file, local_dir)
assert os.path.exists(expected_local_file)
assert expected_local_file == new_local_file
def test_download_a_file_to_a_relative_directory():
"relative destinations are expanded to full paths before downloading"
with remote_fixture() as remote_env:
with empty_local_fixture() as local_env:
with _test_settings():
with lcd(local_env["temp-dir"]):
remote_file = remote_env["temp-files"]["small-file"]
expected_local_file = join(
local_env["temp-dir"], basename(remote_file)
)
relative_dir = "."
new_local_file = download(remote_file, relative_dir)
assert expected_local_file == new_local_file
assert os.path.exists(expected_local_file)
def test_download_a_file_to_a_non_existant_dir():
"""downloading a file to directory that does not exist will see that directory structure created.
note: scp and sftp as used by parallel-ssh appear to do this out of the box but rsync will not.
this ensures behaviour is consistent across all transfer-protocols."""
with remote_fixture() as remote_env:
with empty_local_fixture() as local_env:
with _test_settings():
with lcd(local_env["temp-dir"]):
remote_file = remote_env["temp-files"]["small-file"]
non_existant_dir = "does/not/exist"
expected_local_file = join(
local_env["temp-dir"],
non_existant_dir,
os.path.basename(remote_file),
)
new_local_file = download(remote_file, expected_local_file)
assert expected_local_file == new_local_file
assert os.path.exists(expected_local_file)
def test_download_file_owned_by_root():
"a file owned by root can be downloaded by the regular user if 'use_sudo' is True"
with empty_local_fixture() as local_env:
with remote_fixture() as remote_env:
with _test_settings():
# create a root-only file on remote machine
remote_file_name = remote_env["temp-files"]["small-file"]
file_contents = "root users only!\n"
remote_sudo('printf "%s" > "%s"' % (file_contents, remote_file_name))
remote_sudo('chmod 600 "%s"' % remote_file_name)
remote_sudo('chown root:root "%s"' % remote_file_name)
local_file_name = join(
local_env["temp-dir"], basename(remote_file_name)
)
# ensure remote root-only file cannot be downloaded by regular user.
# in this case we own the directory but the file is owned by root.
with pytest.raises(operations.NetworkError):
download(remote_file_name, local_file_name)
# download remote root-only file as regular user
download(remote_file_name, local_file_name, use_sudo=True)
assert os.path.exists(local_file_name)
with open(local_file_name, "r") as fh:
assert file_contents == fh.read()
def test_upload_file_to_root_dir():
"uploads a file as a regular user to a root-owned directory with `use_sudo`"
with local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
remote_sudo('chown root:root -R "%s"' % remote_env["temp-dir"])
local_file_name = local_env["temp-files"]["small-file"]
remote_file_name = join(
remote_env["temp-dir"], basename(local_file_name)
)
# upload file to root-owned directory
with pytest.raises(operations.NetworkError):
upload(local_file_name, remote_file_name)
upload(local_file_name, remote_file_name, use_sudo=True)
assert remote_file_exists(remote_file_name, use_sudo=True)
def test_upload_and_download_a_file_using_byte_buffers():
"""contents of a BytesIO buffer can be uploaded to a remote file,
and the contents of a remote file can be downloaded to a BytesIO buffer"""
with empty_remote_fixture() as remote_env:
with _test_settings(quiet=True):
payload = b"foo-bar-baz"
uploadable_unicode_buffer = BytesIO(payload)
remote_file_name = join(remote_env["temp-dir"], "bytes-test")
upload(uploadable_unicode_buffer, remote_file_name)
assert remote_file_exists(remote_file_name)
result = remote('cat "%s"' % remote_file_name)
assert result["succeeded"]
assert result["stdout"] == [payload.decode()]
download_unicode_buffer = BytesIO()
download(remote_file_name, download_unicode_buffer)
assert download_unicode_buffer.getvalue() == payload
def test_upload_a_file_using_string_buffers():
"""contents of a StringIO buffer can be uploaded to a remote file,
and the contents of a remote file can be downloaded to a StringIO buffer."""
with empty_remote_fixture() as remote_env:
with _test_settings(quiet=True):
payload = "foo-bar-baz"
uploadable_string_buffer = StringIO(payload)
remote_file_name = join(remote_env["temp-dir"], "string-buffer-test")
upload(uploadable_string_buffer, remote_file_name)
assert remote_file_exists(remote_file_name)
result = remote('cat "%s"' % remote_file_name)
assert result["succeeded"]
assert result["stdout"] == [payload]
download_string_buffer = StringIO()
download(remote_file_name, download_string_buffer)
assert download_string_buffer.getvalue() == payload
def test_check_many_remote_files():
"checks multiple remote files for existence in parallel"
@execute.parallel
def workerfn():
with state.settings() as env:
return remote_file_exists(env["remote_file"], use_sudo=True)
with remote_fixture() as remote_env:
remote_file_list = [
remote_env["temp-files"]["small-file"], # True, exists
join(remote_env["temp-dir"], "doesnot.exist"), # False, doesn't exist
]
expected = [True, False]
with _test_settings():
result = execute.execute(
workerfn, param_key="remote_file", param_values=remote_file_list
)
assert expected == result
def test_line_formatting():
# todo: not a great test. how do I capture and test the formatted line while preserving the original output?
num_workers = 2
@execute.parallel
def workerfn():
iterations = 2
cmd = 'for run in {1..%s}; do echo "I am %s, iteration $run"; done' % (
iterations,
state.ENV["worker_num"],
)
return remote(cmd)
expected = [
{
"command": '/bin/bash -l -c "for run in {1..2}; do echo \\"I am 1, iteration \\$run\\"; done"',
"failed": False,
"return_code": 0,
"stderr": [],
"stdout": [
"I am 1, iteration 1",
"I am 1, iteration 2",
],
"succeeded": True,
},
{
"command": '/bin/bash -l -c "for run in {1..2}; do echo \\"I am 2, iteration \\$run\\"; done"',
"failed": False,
"return_code": 0,
"stderr": [],
"stdout": ["I am 2, iteration 1", "I am 2, iteration 2"],
"succeeded": True,
},
]
with _test_settings(line_template="[{host}] {pipe}: {line}\n"):
results = execute.execute(
workerfn,
param_key="worker_num",
param_values=list(range(1, num_workers + 1)),
)
assert expected == results
# see `threadbare/__init__.py` for gevent monkey patching that allows
# gevent threads (pssh), python futures (boto) and python multiprocessing (threadbare/fabric)
# to work harmoniously
def test_mix_match_ssh_clients1():
"remote commands run serially, then in parallel, then serially don't interfere with each other"
# main process
test_run_a_remote_command() # works
# child processes
test_check_many_remote_files() # works with monkey_patch
# main process again
test_run_a_remote_command() # works
def test_mix_match_ssh_clients2():
"remote command run after parallel remote commands don't interfere with each other"
# child processes
test_check_many_remote_files() # works
# main process again
test_run_a_remote_command() # works
def test_mix_match_ssh_clients3():
"remote commands run in parallel, then serially, then in parallel again don't interfere with each other"
# child processes
test_check_many_remote_files() # works
# main process
test_run_a_remote_command() # works
# child processes
test_check_many_remote_files() # works with monkey_patch
def test_mix_match_ssh_clients4():
"remote commands run in parallel after each other don't interface with each other"
test_check_many_remote_files() # works
test_check_many_remote_files() # works
def test_run_script():
"a simple shell script can be uploaded and executed and the results accessible"
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
local_script = join(local_env["temp-dir"], "script.sh")
with open(local_script, "w") as fh:
fh.write(
r"""#!/bin/bash
echo "hello, world"
"""
)
remote_script = join(remote_env["temp-dir"], "script.sh")
upload(local_script, remote_script)
remote("chmod +x %s" % remote_script)
with rcd(os.path.dirname(remote_script)):
result = remote("./script.sh")
assert ["hello, world"] == result["stdout"]
def test_run_script_parallel():
"""a simple bash script can be uploaded and executed in parallel across multiple hosts,
with each of the hosts' results accessible"""
with empty_local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
local_script = join(local_env["temp-dir"], "script.sh")
with open(local_script, "w") as fh:
fh.write(
r"""#!/bin/bash
echo "hello, world"
"""
)
remote_script = join(remote_env["temp-dir"], "script.sh")
@execute.parallel
def workerfn():
upload(local_script, remote_script)
remote("chmod +x %s" % remote_script)
with rcd(os.path.dirname(remote_script)):
return remote("./script.sh")
results = execute.execute_with_hosts(workerfn, hosts=["127.0.0.1"])
assert ["hello, world"] == results[HOST]["stdout"]
def test_wrapped_exceptions_during_transfer():
"""SCP and SFTP exceptions during transfer are caught and wrapped as an `operations.WrappedNetworkException`.
rsync has no equivalent so a regular `operations.NetworkException` is thrown.
this test suite (example.py) is run using all supported protocols, scp, sftp and rsync.
in two cases we will have a wrapped exception and in one we will have a regular exception.
"""
with local_fixture() as local_env:
with empty_remote_fixture() as remote_env:
with _test_settings():
remote_sudo('chown root:root -R "%s"' % remote_env["temp-dir"])
local_file_name = local_env["temp-files"]["small-file"]
remote_file_name = join(
remote_env["temp-dir"], basename(local_file_name)
)
# upload file to root-owned directory
with pytest.raises(operations.NetworkError) as exc:
upload(local_file_name, remote_file_name)
if TRANSFER_PROTOCOL in ["scp", "sftp"]:
assert isinstance(exc, operations.WrappedNetworkException)
assert exc.wrapped
else:
assert not isinstance(exc, operations.WrappedNetworkException)
assert isinstance(exc, operations.NetworkException)
assert TRANSFER_PROTOCOL == "rsync"