-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsunit.py
executable file
·324 lines (286 loc) · 12.8 KB
/
sunit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""This plugin takes over all tests output to generate subunit-formated
output.
"""
from new import instancemethod
from datetime import datetime
from unittest import TestSuite, TestResult
from testtools.content import Content, ContentType, TracebackContent
#can't name this file as subunit, otherwise the following line fails
from subunit import TestProtocolClient, iso8601, PROGRESS_CUR
from nose.suite import LazySuite
from nose.plugins import Plugin
from nose.util import isclass
from nose.result import _exception_detail
from nose.failure import Failure as NoseFailure
class TextContent(Content):
def __init__(self, value, acontenttype=None):
if acontenttype is None:
acontenttype = ContentType("text","plain")
Content.__init__(self, acontenttype, lambda:value.encode("utf8"))
def fixTestCase(test):
if not hasattr(test, 'id'):
def idfunc(*args): # pylint: disable-msg=W0613
if hasattr(test, 'context'):
cont = test.context
if hasattr(cont, '__module__'):
return cont.__module__+"."+cont.__name__
elif hasattr(cont, '__name__'):
return cont.__name__
else:
return str(test.context)
return str(test)
test.id = idfunc
class SubunitTestResult(TestProtocolClient):
def __init__(self, stream, descriptions, config=None,
errorClasses=None,
#kwargs capture all other arguments, including unused
#ones: verbosity
**kwargs):
if errorClasses is None:
errorClasses = {}
self.errorClasses = errorClasses
#if config is None:
# config = Config()
self.config = config
self.descriptions = descriptions
self.stream = stream #this is to make multiprocess plugin happy
self.useDetails = kwargs.get("useDetails", False)
self._wassuccess = True
TestProtocolClient.__init__(self, stream)
def _getArgs(self, test, err):
if self.useDetails:
details = {"traceback":TracebackContent(err, test)}
error = None
else:
error = err
details = None
return error, details
#properly assign an id() function to nose.failure.Failure (caused
#by import error (or other errors which prevents loading of a file)
#so in such a case, subunit won't print nose.failure.Failure.runTest
#as the test case name
def beforeTest(self, test): # pylint: disable-msg=R0201
if getattr(test, 'test', None) and isinstance(test.test, NoseFailure):
def newid(*args): # pylint: disable-msg=W0613
#test.address() returns a 3 item tuple, the first one
#is the abspath to the py file, the second one is
#the python module name
#the default output would use test.__str__ as the id
#of a nose.failure.Failure, but I think the module
#name is more useful
return test.address()[1]
test.id = newid #test.__str__
#in the case of import module failure, startTest is not called by
#nose runner, we have to detect that case and call it manually in
#addError to have wellformed subunit output
def startTest(self, test):
test._subunit_started = True
#instead of calling TestProtocolClient.startTest(self, test),
#which uses test.id(), we follow what vanilla unittest does
self._stream.write("test: %s\n" % self.getDescription(test))
TestResult.startTest(self, test)
#copied from unittest._TextTestResult (changed str(test) to test.id())
def getDescription(self, test):
if self.descriptions:
return test.shortDescription() or test.id()
else:
return test.id()
#modified from nose/result.addError
def addError(self, test, error): # pylint: disable-msg=W0221
"""Overrides normal addError to add support for
errorClasses. If the exception is a registered class, the
error will be added to the list for that class, not errors.
"""
fixTestCase(test)
#manually call startTest if it's not already called
if not getattr(test, '_subunit_started', False):
self.startTest(test)
ecls, evt, tbk = error # pylint: disable-msg=W0612
# pylint: disable-msg=W0612
for cls, (storage, label, isfail) in self.errorClasses.items():
if isclass(ecls) and issubclass(ecls, cls):
if not isfail:
reason = _exception_detail(evt)
if reason and self.useDetails:
details = {"reason":TextContent(reason)}
reason = None
else:
details = None
self._addNonFailOutcome(label.lower(), test,
reason=reason, details=details)
return
self._wassuccess = False
error, details = self._getArgs(test, error)
test.passed = False
TestProtocolClient.addError(self, test, error, details=details)
def addFailure(self, test, error): # pylint: disable-msg=W0221
self._wassuccess = False
#TestProtocolClient does not call TestResult.addFailure
test.passed = False
fixTestCase(test)
error, details = self._getArgs(test, error)
TestProtocolClient.addFailure(self, test, error, details=details)
def wasSuccessful(self):
return self._wassuccess
def _addNonFailOutcome(self, outcome, test, reason=None, details=None):
"""Report a non-failure error (such as skip)"""
if reason is None:
self._addOutcome(outcome, test, error=None, details=details)
else:
self._stream.write(outcome+": %s [\n" % self.getDescription(test))
self._stream.write("%s\n" % reason)
self._stream.write("]\n")
#subunit.TestProtocolClient _addNonFailOutcome always uses test.id()
#while the vanilla unittest uses getDescription() if self.descriptions
#TODO: report this bug to subunit
def _addOutcome(self, outcome, test, *args, **kwargs):
if self.descriptions:
if not hasattr(test, '_id_') and test.shortDescription():
test._id_ = test.id
def fakeid():
return test.shortDescription() or test.id()
test.id = fakeid
TestProtocolClient._addOutcome(self, outcome, test, *args, **kwargs)
if self.descriptions and hasattr(test, '_id_'):
test.id = test._id_
#use _addOutcome so it will properly use our own test id
def addSuccess(self, test, details=None):
"""Report a success in a test."""
self._stream.write("successful: %s" % self.getDescription(test))
if not details:
self._stream.write("\n")
else:
self._write_details(details)
self._stream.write("]\n")
def addTime(self):
self.time(datetime.now(iso8601.UTC))
#the nose testrunner would call these two functions
def printErrors(self, *args):
pass
def printSummary(self, *args): # pylint: disable-msg=W0613
self.addTime()
class Subunit(Plugin):
"""Output test results in subunit format
"""
name = 'subunit'
#run before multiprocess plugin, otherwise prepareTestRunner
#won't be able to properly monkey patch runner
score = 1100
useDetails = False
multiprocess_workers = 0
config = None
loaderClass = None
def options(self, parser, env):
"""Register commandline options
"""
Plugin.options(self, parser, env)
parser.add_option(
"--no-preload", action="store_false",
default=not env.get("NOSE_NO_PRELOAD"), dest="preload",
help="Don't preload any nose LazySuite so instead of a single "
"subunit progress output, this will generate "
"more than one. Useful if preloading all tests is not "
"feasible [NOSE_NO_PRELOAD]")
def configure(self, options, conf):
if not self.can_configure:
return
Plugin.configure(self, options, conf)
self.config = conf
self.preload = options.preload
#detailedErrors is defined in failuredetail plugin
self.useDetails = getattr(options,
"detailedErrors", self.useDetails)
#multiprocess_workers defined in multiprocess plugin
self.multiprocess_workers = getattr(options,
"multiprocess_workers", self.multiprocess_workers)
#copied from multiprocess plugin
def prepareTestLoader(self, loader):
"""Remember loader class so MultiProcessTestRunner can instantiate
the right loader.
"""
self.loaderClass = loader.__class__
#monkey patch SuiteFactory.__call__ function to count how many tests there are
sC = loader.suiteClass
plugin = self
def __call__(tests=(), **kw):
is_suite = isinstance(tests, TestSuite)
if callable(tests) and not is_suite:
tests = list(tests())
elif is_suite:
tests = [tests]
suite = sC(tests, **kw)
if not is_suite:
if not getattr(tests, '__len__', None):
lt = tests.tests
else:
lt = tests
#preload all lazy load suite, so we can count totalTests properly
if self.preload:
for t in lt:
plugin.preloadLazySuite(t)
plugin.totalTests += sum(1 for i in lt if not isinstance(i, TestSuite))
#plugin.totalTests += sum(i.countTestCases() for i in lt)
#import pdb;pdb.set_trace()
#print "totalTests", plugin.totalTests, '*'*60
return suite
#assigning to suiteClass.__call__ won't make it work: calling suiteClass() won't trigger the __call__ attribute
loader.suiteClass = __call__
def preloadLazySuite(self, t):
if isinstance(t, LazySuite):
tests = [i for i in t]
t._tests = tests
#so we stick with prepareTestResult for now
def prepareTestRunner(self, runner):
#replace _makeResult in the default nose TestRunner to return
#our implementation SubunitTestResult
if not hasattr(runner, "_makeResult"):
raise Exception('''runner does not have _makeResult method,\
don't know how to attach to it.''')
#this plugin runs before multiprocess plugin, and this function
#return a runner, so it will prevent multiprocess.prepareTestRunner
#from executing. so if multiprocess is enabled, we need to create
#MultiProcessTestRunner
if self.multiprocess_workers:
#with MultiProcessTestRunner.run, beforeTest is called too
#early, so it will never print any progress info
#instead, we have to monkey patch nextBatch which is
#called by MultiProcessTestRunner.run to collect all tests
from nose.plugins.multiprocess import MultiProcessTestRunner
class SubunitMultiProcessTestRunner(MultiProcessTestRunner):
_top = 1
def nextBatch(self, test):
top = self._top
if top:
self._top = 0
for batch in MultiProcessTestRunner.nextBatch(self, test):
yield batch
if top:
plugin.printProgress()
runner = SubunitMultiProcessTestRunner(stream=runner.stream,
verbosity=self.config.verbosity,
config=self.config,
loaderClass=self.loaderClass)
runner.useDetails = self.useDetails
plugin = self
def _makeResult(self):
result = SubunitTestResult(self.stream, self.descriptions,
self.config, useDetails=self.useDetails)
result.addTime()
#save the result so it can be used in beforeTest below
plugin.result = result
#plugin.printProgress()
#if plugin.totalTests:
# result.progress(plugin.totalTests, PROGRESS_CUR)
# result.addTime()
return result
runner._makeResult = instancemethod(_makeResult,
runner, runner.__class__)
return runner
totalTests = 0
def printProgress(self):
if self.totalTests:
self.result.progress(self.totalTests, PROGRESS_CUR)
self.totalTests = 0
def beforeTest(self, *args):
self.printProgress()
# vi: set filetype=python tabstop=4 softtabstop=4 shiftwidth=4 expandtab: