forked from Cephla-Lab/Squid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathabc.py
217 lines (168 loc) · 5.2 KB
/
abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
from abc import ABC, abstractmethod
from typing import Tuple
class LightSource(ABC):
"""Abstract base class defining the interface for different light sources."""
@abstractmethod
def __init__(self):
"""Initialize the light source and establish communication."""
pass
@abstractmethod
def initialize(self):
"""
Initialize the connection and settings for the light source.
Returns True if successful, False otherwise.
"""
pass
@abstractmethod
def set_intensity_control_mode(self, mode):
"""
Set intensity control mode.
Args:
mode: IntensityControlMode(Enum)
"""
pass
@abstractmethod
def get_intensity_control_mode(self):
"""
Get current intensity control mode.
Returns:
IntensityControlMode(Enum)
"""
pass
@abstractmethod
def set_shutter_control_mode(self, mode):
"""
Set shutter control mode.
Args:
mode: ShutterControlMode(Enum)
"""
pass
@abstractmethod
def get_shutter_control_mode(self):
"""
Get current shutter control mode.
Returns:
ShutterControlMode(Enum)
"""
pass
@abstractmethod
def set_shutter_state(self, channel, state):
"""
Turn a specific channel on or off.
Args:
channel: Channel ID
state: True to turn on, False to turn off
"""
pass
@abstractmethod
def get_shutter_state(self, channel):
"""
Get the current shutter state of a specific channel.
Args:
channel: Channel ID
Returns:
bool: True if channel is on, False if off
"""
pass
@abstractmethod
def set_intensity(self, channel, intensity):
"""
Set the intensity for a specific channel.
Args:
channel: Channel ID
intensity: Intensity value (0-100)
"""
pass
@abstractmethod
def get_intensity(self, channel) -> float:
"""
Get the current intensity of a specific channel.
Args:
channel: Channel ID
Returns:
float: Current intensity value
"""
pass
@abstractmethod
def shut_down(self):
"""Safely shut down the light source."""
pass
import abc
import time
from typing import Optional
import pydantic
import squid.logging
from squid.config import AxisConfig, StageConfig
from squid.exceptions import SquidTimeout
class Pos(pydantic.BaseModel):
x_mm: float
y_mm: float
z_mm: float
# NOTE/TODO(imo): If essentially none of our stages have a theta, this is probably fine. But If it's a mix we probably want a better way of handling the "maybe has theta" case.
theta_rad: Optional[float]
class StageStage(pydantic.BaseModel):
busy: bool
class AbstractStage(metaclass=abc.ABCMeta):
def __init__(self, stage_config: StageConfig):
self._config = stage_config
self._log = squid.logging.get_logger(self.__class__.__name__)
@abc.abstractmethod
def move_x(self, rel_mm: float, blocking: bool = True):
pass
@abc.abstractmethod
def move_y(self, rel_mm: float, blocking: bool = True):
pass
@abc.abstractmethod
def move_z(self, rel_mm: float, blocking: bool = True):
pass
@abc.abstractmethod
def move_x_to(self, abs_mm: float, blocking: bool = True):
pass
@abc.abstractmethod
def move_y_to(self, abs_mm: float, blocking: bool = True):
pass
@abc.abstractmethod
def move_z_to(self, abs_mm: float, blocking: bool = True):
pass
# TODO(imo): We need a stop or halt or something along these lines
# @abc.abstractmethod
# def stop(self, blocking: bool=True):
# pass
@abc.abstractmethod
def get_pos(self) -> Pos:
pass
@abc.abstractmethod
def get_state(self) -> StageStage:
pass
@abc.abstractmethod
def home(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True):
pass
@abc.abstractmethod
def zero(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True):
pass
@abc.abstractmethod
def set_limits(
self,
x_pos_mm: Optional[float] = None,
x_neg_mm: Optional[float] = None,
y_pos_mm: Optional[float] = None,
y_neg_mm: Optional[float] = None,
z_pos_mm: Optional[float] = None,
z_neg_mm: Optional[float] = None,
theta_pos_rad: Optional[float] = None,
theta_neg_rad: Optional[float] = None,
):
pass
def get_config(self) -> StageConfig:
return self._config
def wait_for_idle(self, timeout_s):
start_time = time.time()
while time.time() < start_time + timeout_s:
if not self.get_state().busy:
return
# Sleep some small amount of time so we can yield to other threads if needed
# while waiting.
time.sleep(0.001)
error_message = f"Timed out waiting after {timeout_s:0.3f} [s]"
self._log.error(error_message)
raise SquidTimeout(error_message)