forked from lammps/lammps-testing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_tests.py
executable file
·109 lines (77 loc) · 3.15 KB
/
run_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/bin/env python3
import os
import nose
import xml.etree.ElementTree as ET
import argparse
import subprocess
import glob
from multiprocessing import Pool
__author__ = 'Richard Berger'
__email__ = "[email protected]"
def load_tests(filename):
print("Loading tests from %s..." % filename)
tests = []
loader = nose.loader.TestLoader()
ctx = loader.loadTestsFromName(filename)
for tc in list(ctx):
for testname in list(tc):
parts = testname.id().split('.')
classname = '.'.join(parts[0:-1])
name = parts[-1]
selector = "{0}.{1}:{2}.{3}".format(*parts)
tests.append({'classname': classname, 'name': name, 'time': 1.0, 'selector' : selector})
return tests
def load_test_timing(tests, filename):
""" load test runtimes from xunit result file and save to corresponding test """
print("Loading timing information from %s..." % filename)
def find_test(classname, name):
for x in tests:
if x['classname'] == classname and x['name'] == name:
return x
return None
tree = ET.parse(filename)
testcases = [x.attrib for x in tree.getroot()]
for test_result in testcases:
test = find_test(test_result['classname'], test_result['name'])
if test:
test['time'] = float(test_result['time'])
def create_job_queues(tests, nprocesses):
queues = []
for q in range(nprocesses):
queues.append([])
sorted_tests = sorted(tests, key=lambda x: float(x['time']), reverse=True)
for t in sorted_tests:
work_assignment = sorted([(sum([float(x['time']) for x in q]), i) for i,q in enumerate(queues)])
# select queue with lowest work assignment
selected_queue = work_assignment[0][1]
queues[selected_queue].append(t)
return queues
parser = argparse.ArgumentParser(description='run nosetests in parallel and load balance using timning information')
parser.add_argument('files', metavar='FILE', nargs='+', help='python module file(s) containing tests')
parser.add_argument('-p', '--processes', type=int, help='number of parallel processes', default=1)
parser.add_argument('-d', '--dry-run', action='store_true', help='do not run nosetests, but output schedule', default=False)
args = parser.parse_args()
# load tests
tests = []
for f in args.files:
tests += load_tests(f)
timing_files = glob.glob('nosetests-*.xml')
for tf in timing_files:
load_test_timing(tests, tf)
queues = create_job_queues(tests, args.processes)
def print_list(x):
for elem in x:
print(" - ", elem['classname'], elem['name'], elem['time'])
for i, q in enumerate(queues):
print(i, len(q), sum([float(x['time']) for x in q]))
print_list(q)
def run_nose(args):
pid, tests = args
xunitfile = "nosetests-{0:02}.xml".format(pid)
print(pid, ": Running nose with", len(tests), "tests...", xunitfile)
selected_tests = [x['selector'] for x in tests]
call = ['nosetests', '-v', '--with-xunit', '--xunit-file=' + xunitfile] + selected_tests
subprocess.call(call)
if not args.dry_run:
pool = Pool(processes=args.processes)
pool.map(run_nose, enumerate(queues))