forked from OxfordIonTrapGroup/stabilizer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstabilizer.py
107 lines (88 loc) · 3.14 KB
/
stabilizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import asyncio
from collections import OrderedDict as OD
import logging
import numpy as np
logger = logging.getLogger()
class StabilizerError(Exception):
pass
class StabilizerConfig:
async def connect(self, host, port=1235):
self.reader, self.writer = await asyncio.open_connection(host, port)
async def set(self, channel, iir):
up = OD([("channel", channel), ("iir", iir.as_dict())])
s = json.dumps(up, separators=(",", ":"))
assert "\n" not in s
logger.debug("send %s", s)
self.writer.write(s.encode() + b"\n")
r = (await self.reader.readline()).decode()
logger.debug("recv %s", r)
ret = json.loads(r, object_pairs_hook=OD)
if ret["code"] != 200:
raise StabilizerError(ret)
return ret
class IIR:
t_update = 2e-6
full_scale = float((1 << 15) - 1)
def __init__(self):
self.ba = np.zeros(5, np.float32)
self.y_offset = 0.
self.y_min = -self.full_scale - 1
self.y_max = self.full_scale
def as_dict(self):
iir = OD()
iir["ba"] = [float(_) for _ in self.ba]
iir["y_offset"] = self.y_offset
iir["y_min"] = self.y_min
iir["y_max"] = self.y_max
return iir
def configure_pi(self, kp, ki, g=0.):
ki = np.copysign(ki, kp)*self.t_update*2
g = np.copysign(g, kp)
eps = np.finfo(np.float32).eps
if abs(ki) < eps:
a1, b0, b1 = 0., kp, 0.
else:
if abs(g) < eps:
c = 1.
else:
c = 1./(1. + ki/g)
a1 = 2*c - 1.
b0 = ki*c + kp
b1 = ki*c - a1*kp
if abs(b0 + b1) < eps:
raise ValueError("low integrator gain and/or gain limit")
self.ba[0] = b0
self.ba[1] = b1
self.ba[2] = 0.
self.ba[3] = a1
self.ba[4] = 0.
def set_x_offset(self, o):
b = self.ba[:3].sum()*self.full_scale
self.y_offset = b*o
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("-s", "--stabilizer", default="10.0.16.99")
p.add_argument("-c", "--channel", default=0, type=int,
help="Stabilizer channel to configure")
p.add_argument("-o", "--offset", default=0., type=float,
help="input offset, in units of full scale")
p.add_argument("-p", "--proportional-gain", default=1., type=float,
help="Proportional gain, in units of 1")
p.add_argument("-i", "--integral-gain", default=0., type=float,
help="Integral gain, in units of Hz, "
"sign taken from proportional-gain")
args = p.parse_args()
loop = asyncio.get_event_loop()
# loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)
async def main():
i = IIR()
i.configure_pi(args.proportional_gain, args.integral_gain)
i.set_x_offset(args.offset)
s = StabilizerConfig()
await s.connect(args.stabilizer)
assert args.channel in range(2)
r = await s.set(args.channel, i)
loop.run_until_complete(main())