-
Notifications
You must be signed in to change notification settings - Fork 11
/
MachInfoMockup.py
119 lines (89 loc) · 3.41 KB
/
MachInfoMockup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
[Name] MachInfoMockup
[Description]
MachInfo hardware objects are used to obtain information from the accelerator
control system.
This is a mockup hardware object, it simulates the behaviour of an accelerator
information by :
- produces a current value that varies with time
- simulates a control room message that changes with some condition
()
- simulates
[Emited signals]
machInfoChanged
pars: values (dict)
mandatory fields:
values['current'] type: str; desc: synchrotron radiation current in milli-amps
values['message'] type: str; desc: message from control room
values['attention'] type: boolean; desc: False (if no special attention is required)
True (if attention should be raised to the user)
optional fields:
any number of optional fields can be sent over with this signal by adding them in the
values dictionary
for example:
values['lifetime']
values['topup_remaining']
"""
import logging
import gevent
import time
from HardwareRepository import HardwareRepository
from HardwareRepository.BaseHardwareObjects import Equipment
class MachInfoMockup(Equipment):
default_current = 200 # milliamps
default_lifetime = 45 # hours Lifetime
default_message = "Beam Delivered"
default_topup_remaining = 70 # seconds
def __init__(self, *args):
Equipment.__init__(self, *args)
self.current = self.default_current
self.lifetime = self.default_lifetime
self.message = self.default_message
self.topup_remaining = self.default_topup_remaining
def init(self):
self._run()
def _run(self):
gevent.spawn(self._update_me)
def _update_me(self):
self.t0 = time.time()
while True:
gevent.sleep(5)
elapsed = time.time() - self.t0
self.topup_remaining = abs((self.default_topup_remaining - elapsed) % 300)
if self.topup_remaining < 60:
self.message = "ATTENTION: topup in %3d secs" % int(self.topup_remaining)
self.attention = True
else:
self.message = self.default_message
self.attention = False
self.current = "%3.2f mA" % (self.default_current - (3-self.topup_remaining/100.0) * 5)
values = dict()
values['current'] = self.current
values['message'] = self.message
values['lifetime'] = "%3.2f hours" % self.lifetime
values['topup_remaining'] = "%3.0f secs" % self.topup_remaining
values['attention'] = self.attention
self.emit('machInfoChanged',values)
def getCurrent(self):
return self.current
def getLifeTime(self):
return self.lifetime
def getTopUpRemaining(self):
return self.topup_remaining
def getMessage(self):
return self.message
def test():
import os
import sys
hwr_directory = os.environ["XML_FILES_PATH"]
hwr = HardwareRepository.HardwareRepository(os.path.abspath(hwr_directory))
hwr.connect()
conn = hwr.getHardwareObject(sys.argv[1])
print "Machine current: ", conn.getCurrent()
print "Life time: ", conn.getLifeTime()
print "TopUp remaining: ", conn.getTopUpRemaining()
print "Message: ", conn.getMessage()
while True:
gevent.wait(timeout=0.1)
if __name__ == '__main__':
test()