forked from mxcube/mxcubecore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTaskUtils.py
121 lines (103 loc) · 3.33 KB
/
TaskUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import sys
import types
import logging
import gevent
class cleanup:
def __init__(self,*args,**keys) :
self.cleanup_funcs = args
self.keys = keys
def __enter__(self):
pass
def __exit__(self,*args):
if self.cleanup_funcs:
for cleanup_func in self.cleanup_funcs:
if not callable(cleanup_func):
continue
try:
cleanup_func(**self.keys)
except:
logging.exception("Exception while calling cleanup callback %s", cleanup_func)
continue
class error_cleanup:
def __init__(self,*args,**keys) :
self.error_funcs = args
self.keys = keys
def __enter__(self):
pass
def __exit__(self,*args):
if args[0] is not None and self.error_funcs:
logging.debug("Doing error cleanup")
for error_func in self.error_funcs:
if not callable(error_func):
continue
try:
error_func(**self.keys)
except:
logging.exception("Exception while calling error cleanup callback %s", error_func)
continue
class TaskException:
def __init__(self, exception, error_string, tb):
self.exception = exception
self.error_string = error_string
self.tb = tb
class wrap_errors(object):
def __init__(self, func):
"""Make a new function from `func', such that it catches all exceptions
and return it as a TaskException object
"""
self.func = func
def __call__(self, *args, **kwargs):
func = self.func
try:
return func(*args, **kwargs)
except:
sys.excepthook(*sys.exc_info())
return TaskException(*sys.exc_info())
def __str__(self):
return str(self.func)
def __repr__(self):
return repr(self.func)
def __getattr__(self, item):
return getattr(self.func, item)
def task(func):
def start_task(*args, **kwargs):
if args and type(args[0]) == types.InstanceType:
logging.debug("Starting %s%s", func.__name__, args[1:])
else:
logging.debug("Starting %s%s", func.__name__, args)
try:
wait = kwargs["wait"]
except KeyError:
wait = True
else:
del kwargs["wait"]
try:
timeout = kwargs["timeout"]
except KeyError:
timeout = None
else:
del kwargs["timeout"]
try:
t = gevent.spawn(wrap_errors(func), *args, **kwargs)
if wait:
ret = t.get(timeout = timeout)
if isinstance(ret, TaskException):
sys.excepthook(ret.exception, ret.error_string, ret.tb)
raise ret.exception, ret.error_string
else:
return ret
else:
t._get = t.get
def special_get(self, *args, **kwargs):
ret = self._get(*args, **kwargs)
if isinstance(ret, TaskException):
sys.excepthook(ret.exception, ret.error_string, ret.tb)
raise ret.exception, ret.error_string
else:
return ret
setattr(t, "get", types.MethodType(special_get, t))
return t
except:
t.kill()
raise
return start_task