-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalfred.py
256 lines (218 loc) · 8.51 KB
/
alfred.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
"""
Alfred is a test-suite for functional tests
you can use it as it is or create you own classes
Return:
0 on success,
1 if at least one test are broken or were skipped
"""
__author__ = 'coop'
import os
import sys
import logging as log
def parse_config(configfile):
from alfred.config import AlfredConfig
return AlfredConfig(configfile)
def create_statistics():
from alfred.statistics import TestStatistics
return TestStatistics()
def filter_test(test, test_filters):
'''document me
:param test: the test to check
:type test: TestClass()
:param test_filters: the filter for the test
:type test_filters: list of tuples with type and value
:return:
:rtype:
'''
import alfred.misc as misc
import re
if not test_filters:
return True
for test_filter in test_filters:
if len(test_filter) < 2:
misc.die(1, 'Filter option needs at least 2 arguments: '
'filter type and filter value')
filter_type = test_filter[0]
if filter_type == 'd':
# filter by directory
log.info('filter by directory {}, test_dir {}'.format(
test_filter[1:], test.test_dir)
)
for directory in test_filter[1:]:
if os.path.normpath(test.test_dir) == \
os.path.normpath(directory):
return True
elif filter_type == 'n':
# filter by test name
log.info('filter by test-name {}, test.name {}'.format(
test_filter[1:], test.name)
)
for name in test_filter[1:]:
# check if the filter is an part of the testname
if re.search(name, test.name):
return True
elif filter_type == 'alfred':
# build in filters
if test_filter[1] == 'smoke':
return test.cfg.get('smoke', None) == 'True'
else:
misc.die(1, 'unknown filter type {}'.format(filter_type))
return False
def get_tests(test_dir, test_module, test_class, cfg, test_filter=None):
import alfred.module_loader as module_loader
import alfred.misc as misc
loader = module_loader.Loader(cfg.get('module_path', default='alfred'))
test_module = loader.load_class(test_module, test_class)
git_root = misc.get_git_root()
tests = []
for tdir in test_dir.split():
for root, dirs, files in os.walk(tdir, topdown=False):
for name in files:
if name.endswith('.t'):
log.info('found {}'.format(name))
tmp_test = test_module(root, name)
if tmp_test.description and tmp_test.name:
tmp_test.source = os.path.normpath(os.path.join(root,
name))
tmp_test.test_dir = root
tmp_test.working_dir = os.path.normpath(
os.path.join(git_root, cfg.get('working_dir',
root))
)
tmp_test.root_cfg = cfg
if filter_test(tmp_test, test_filter):
log.debug('add test {}'.format(
os.path.normpath(os.path.join(tmp_test.test_dir,
tmp_test.name))
))
tests.append(tmp_test)
return tests
def main():
from alfred.config import get_cli_options
from alfred import returncodes
from alfred import bcolors
import alfred.misc as misc
import alfred.module_loader as module_loader
import time
opts = get_cli_options()
# configure logger
if log.root:
del log.root.handlers[:]
formatstring = '[%(levelname)s]: alfred: %(message)s'
loglevel = log.getLevelName(opts.loglevel.upper())
# capture warnings to logging
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
log.captureWarnings(True)
test_filter = opts.filter
# parse config
configfile = opts.configfile
cfg = parse_config(configfile)
if opts.overwrite:
for option in opts.overwrite.split(','):
key, value = option.split(':')
cfg.set(key, value)
logfile = cfg.get('logfile')
working_dir = cfg.get('working_dir')
git_root = misc.get_git_root()
if working_dir:
if git_root:
working_dir = os.path.normpath(os.path.join(git_root, working_dir))
if not opts.debug and logfile and working_dir:
try:
os.makedirs(working_dir)
except OSError as ex:
# file exists
log.debug('unable to create working dir: {}'.format(ex))
pass
logfile = os.path.normpath(os.path.join(working_dir, logfile))
# remove the main logfile if there is an old version
try:
os.remove(logfile)
except OSError as ex:
# the file did not exist
log.debug('unable to remove logfile: {}'.format(ex))
pass
# create the base path of the logfile
try:
os.makedirs(os.path.dirname(logfile))
except OSError as ex:
# directory already exists
log.debug('unable to create logfile dir: {}'.format(ex))
pass
log.basicConfig(format=formatstring, level=loglevel, filename=logfile)
else:
log.basicConfig(format=formatstring, level=loglevel)
stats = create_statistics()
test_module = cfg.get('test_module', default='alfred')
test_class = cfg.get('test_class', default='TestClass')
setup_class = cfg.get('setup_class', default='SetupClass')
teardown_class = cfg.get('teardown_class', default='TearDownClass')
loader = module_loader.Loader(cfg.get('module_path', default='alfred'))
log.debug('get tests')
abs_test_dir = os.path.join(os.path.dirname(os.path.relpath(__file__)),
cfg.get('test_dir', default='example'))
abs_test_dir = cfg.get('test_dir', default='example')
tests = get_tests(abs_test_dir, test_module, test_class, cfg, test_filter)
if not tests:
misc.die(0, 'no tests found')
if opts.sorted:
tests.sort(key=lambda elem: elem.source)
elif opts.random:
import random
random.shuffle(tests)
log.debug('setup tests')
setup_module = loader.load_class(test_module, setup_class)
setup = setup_module(cfg)
setup.run()
main_start_time = 0
# lets test
try:
print('run tests\n')
main_start_time = time.time()
for test in tests:
sys.stdout.write('* %-50s ' % test.name)
sys.stdout.flush()
start_time = time.time()
test.run()
rc = test.rc
stats.update(rc, name=test.name)
if rc == returncodes.SUCCESS:
log.info('\'{}\' finished successful'.format(test.name))
sys.stdout.write(bcolors.BLUE + 'OK\t\t' + bcolors.ENDC)
elif rc == returncodes.SKIPPED:
log.warn('\'{}\' skipped: {}'.format(test.name, test.skip))
sys.stdout.write(bcolors.YELLOW + 'SKIPPED\t' + bcolors.ENDC)
elif rc == returncodes.FAILURE:
log.error('\'{}\' failed at cmd(s) \'{}\''.format(
test.name, test.failed_command)
)
if cfg.get('stop_on_error') == "True":
break
sys.stdout.write(bcolors.RED + 'FAIL\t' + bcolors.ENDC)
end_time = time.time()
print(' in {} s'.format(end_time - start_time))
except KeyboardInterrupt:
log.debug('aborted by user')
finally:
print('')
main_end_time = time.time()
log.debug('teardown tests')
teardown_module = loader.load_class(test_module, teardown_class)
teardown = teardown_module(cfg)
teardown.run()
if opts.verbose:
stats.write_verbose()
else:
stats.write()
print('test time: {} s'.format(main_end_time - main_start_time))
if stats.all_success():
print('All tests finished successful')
sys.exit(0)
elif stats.get(returncodes.FAILURE):
misc.die(1, 'At least one test is broken')
else:
log.warning('At least one test is skipped')
sys.exit(0)
if __name__ == '__main__':
main()