-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlaser_egismos.py
395 lines (325 loc) · 12.5 KB
/
laser_egismos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2023 Phil Underwood for Underwood Underground
#
# SPDX-License-Identifier: MIT
"""
`laser_egismos`
================================================================================
Device driver for the egismos series of lasers, available at
https://www.egismos.com/laser-measuring-optoelectronics-module
* Author(s): Phil Underwood
Implementation Notes
--------------------
**Hardware:**
* `Egismos laser <https://www.egismos.com/laser-measuring-optoelectronics-module>`_
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://circuitpython.org/downloads
"""
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/furbrain/CircuitPython_laser_egismos.git"
import time
try:
from typing import Sequence, Tuple
except ImportError:
pass
import busio
DEFAULT_TIMEOUT = 5.0
class LaserError(RuntimeError):
"""
An error from the laser module
"""
class LaserCommandFailedError(LaserError):
"""
Laser command not recognised or acknowledged
"""
class BadReadingError(LaserError):
"""
Error while making a reading
"""
class TooDimError(LaserError):
"""
Laser return was too dim to be interpreted
"""
class TooBrightError(LaserError):
"""
Laser was too bright to get accurate reading
"""
class LaserTimeOutError(LaserError):
"""
Laser took too long to respond
"""
class _LaserBase:
# pylint: disable=too-few-public-methods
FRAME_END = 0xA8
FRAME_START = 0xAA
BUZZER_CONTROL = 0x47
STOP_CONTINUOUS_MEASURE = 0x46
CONTINUOUS_MEASURE = 0x45
SINGLE_MEASURE = 0x44
LASER_OFF = 0x43
LASER_ON = 0x42
READ_DEVICE_ERR = 0x08
SET_SLAVE_ADDRESS = 0x41
READ_SLAVE_ADDRESS = 0x04
READ_DEV_TYPE = 0x02
READ_SW_VERSION = 0x01
def __init__(self, uart: busio.UART, address=0x01, timeout=DEFAULT_TIMEOUT):
"""
Access an Egismos Laser distance module v2
:param ~busio.UART uart: uart to use to connect. Should have baud rate set to 9600
:param address: address to use, default is 0x01; you should only change this if
using multiple devices
:param timeout: timeout to wait for a response from the device
"""
self.uart: busio.UART = uart
self.address: int = address
self.timeout: float = timeout
def _build_frame(
self, command: int, address=None, data: Sequence[int] = None
) -> bytes:
"""
Build a frame that represents the given command
:param command: Command to send
:param address: address to send to, default is 1
:param data: int or list of ints, represents data parts to send
:return:
"""
if address is None:
address = self.address
if data is None:
data = []
if isinstance(data, int):
data = [data]
checksum = (command + address + sum(data)) & 0x7F
frame = [self.FRAME_START, address, command] + data + [checksum, self.FRAME_END]
return bytes(frame)
def _parse_frame(self, frame: bytes) -> Tuple[int, int, bytes]:
"""
Parse a frame and return the contained data. Raises a value error if incorrect
start or end bytes, or if the checksum is incorrect
:param bytes frame: The frame to be parsed
:return: a tuple containing the command
:raises: `ValueError` if an error is encountered
"""
if frame[0] != self.FRAME_START:
raise LaserCommandFailedError(
f"Frame does not start with {self.FRAME_START}"
)
if frame[-1] != self.FRAME_END:
raise LaserCommandFailedError(f"Frame does not end with {self.FRAME_END}")
checksum = sum(frame[1:-2]) & 0x7F
if frame[-2] != checksum:
raise LaserCommandFailedError(
f"Checksum should be {checksum}, was {frame[-2]}"
)
command = frame[2]
address = frame[1]
data = frame[3:-2]
return command, address, data
def _process_frame(self, address, command, frame):
if address is None:
address = self.address
read_command, read_address, read_data = self._parse_frame(frame)
if command != read_command:
raise LaserCommandFailedError(
f"Received command {read_command} does not match"
+ f" sent command {command}"
)
if address != read_address:
raise LaserCommandFailedError(
f"Received address {read_address} does not match"
+ f" sent address {address}"
)
return read_data
@staticmethod
def _check_measurement_for_errors(result):
if result == b"ERR256":
raise TooBrightError("Too much ambient light, or laser too close")
if result == b"ERR255":
raise TooDimError(
"Laser spot too dim. Use reflective tape or shorter distance"
)
if result == b"ERR204":
raise BadReadingError("Unable to measure - is the target moving?")
try:
result = int(result)
except ValueError as exc:
raise LaserCommandFailedError("Unexpected response from read") from exc
return result
class Laser(_LaserBase):
"""
This is a driver for the Laser Module 2, produced by Egismos
"""
def _read_frame(self):
# wait for an AA to start
timeout_due = time.monotonic() + self.timeout
buffer = b"\x00"
while buffer[0] != self.FRAME_START:
buffer = self.uart.read(1) or b"\x00"
if time.monotonic() > timeout_due:
raise LaserTimeOutError("Timed Out waiting for FRAME_START")
while buffer[-1] != self.FRAME_END:
buffer += self.uart.read(1) or b""
if time.monotonic() > timeout_due:
raise LaserTimeOutError("Timed Out waiting for FRAME_END")
return buffer
def _send_and_receive(
self, command: int, data: int = None, address: int = None
) -> bytes:
frame = self._build_frame(command, address, data)
self.uart.reset_input_buffer() # clear input before writing
self.uart.write(frame)
frame = self._read_frame()
read_data = self._process_frame(address, command, frame)
return read_data
def _send_command_and_raise_on_failure(self, command, data=None):
"""
Send a command, and raise `LaserCommandFailedError` if it does not succeed
:param int command:
:param data: Optional data byte or sequence for the command
"""
result = self._send_and_receive(command, data)
if not result or result[0] != 0x01:
raise LaserCommandFailedError(f"Tried to send {command} but it failed")
def set_laser(self, value: bool):
"""
Turn the laser pointer on
:param bool value: If ``True``, turn on laser, turn off if ``False``
"""
if value:
self._send_command_and_raise_on_failure(self.LASER_ON)
else:
self._send_command_and_raise_on_failure(self.LASER_OFF)
def stop_measuring(self):
"""
Stop measuring when in continuous mode
:return:
"""
self._send_command_and_raise_on_failure(self.STOP_CONTINUOUS_MEASURE)
def set_buzzer(self, value: bool):
"""
Turn on or off beeps when receiving commands
:param bool value: If ``True``, turn on beeps, turn off if ``False``
:return:
"""
self._send_command_and_raise_on_failure(self.BUZZER_CONTROL, int(value))
def set_slave_address(self, address):
"""
Set the address of the laser pointer
:param int address: Address to use - between 1 and 255
"""
self._send_command_and_raise_on_failure(self.SET_SLAVE_ADDRESS, address)
self.address = address
def measure(self) -> int:
"""
Make a single reading.
:return: distance in mm
:raises: `LaserError`; can be one of
`TooDimError`
Can't see the laser spot properly. Use reflective tape or a shorter distance
`TooBrightError`
Laser spot is too bright (maybe too close to the device or there may be too much
ambient light)
`BadReadingError`
Measurement failed, often due to movement
`LaserCommandFailedError`
Return value from laser was garbled
"""
result = self._send_and_receive(self.SINGLE_MEASURE)
result = self._check_measurement_for_errors(result)
return result
@property
def distance(self) -> float:
"""
Get the distance in cm
:return: Distance in cm
:raises: Same as `measure`
"""
return self.measure() / 10.0
class AsyncLaser(_LaserBase):
"""
Same as `Laser`, but with async methods, requires the `asyncio` module
"""
def __init__(self, uart: busio.UART, address=0x01, timeout=DEFAULT_TIMEOUT):
# pylint: disable=import-outside-toplevel
import asyncio
uart.timeout = 0
super().__init__(uart, address, timeout)
self.async_reader = asyncio.StreamReader(uart)
async def _read_frame(self):
buffer = b"\x00"
while buffer[0] != self.FRAME_START:
buffer = await self.async_reader.read(1) or b""
while buffer[-1] != self.FRAME_END:
buffer += await self.async_reader.read(1) or b""
return buffer
async def _send_and_receive(
self, command: int, data: int = None, address: int = None
) -> bytes:
# pylint: disable=import-outside-toplevel
import asyncio
frame = self._build_frame(command, address, data)
self.uart.write(frame)
try:
frame = await asyncio.wait_for(self._read_frame(), self.timeout)
except asyncio.TimeoutError as exc:
raise LaserTimeOutError("Did not receive response within timeout") from exc
read_data = self._process_frame(address, command, frame)
return read_data
async def _send_command_and_raise_on_failure(self, command, data=None):
"""
Send a command, and raise `LaserCommandFailedError` if it does not succeed
:param int command:
:param data: Optional data byte or sequence for the command
"""
result = await self._send_and_receive(command, data)
if not result or result[0] != 0x01:
raise LaserCommandFailedError(f"Tried to send {command} but it failed")
async def set_laser(self, value: bool):
"""
Turn the laser pointer on or off
:param bool value: If ``True``, turn on laser, turn off if ``False``
"""
if value:
await self._send_command_and_raise_on_failure(self.LASER_ON)
else:
await self._send_command_and_raise_on_failure(self.LASER_OFF)
async def stop_measuring(self):
"""
Stop measuring when in continuous mode
:return:
"""
await self._send_command_and_raise_on_failure(self.STOP_CONTINUOUS_MEASURE)
async def set_buzzer(self, value: bool):
"""
Turn on or off beeps when receiving commands
:param bool value: If ``True``, turn on beeps, turn off if ``False``
:return:
"""
await self._send_command_and_raise_on_failure(self.BUZZER_CONTROL, int(value))
async def set_slave_address(self, address):
"""
Set the address of the laser pointer
:param int address: Address to use - between 1 and 255
"""
await self._send_command_and_raise_on_failure(self.SET_SLAVE_ADDRESS, address)
self.address = address
async def measure(self) -> int:
"""
Make a single reading.
:return: distance in mm
:raises: `LaserError`; can be one of
`TooDimError`
Can't see the laser spot properly. Use reflective tape or a shorter distance
`TooBrightError`
Laser spot is too bright (maybe too close to the device or there may be too much
ambient light)
`BadReadingError`
Measurement failed, often due to movement
`LaserCommandFailedError`
Return value from laser was garbled
"""
result = await self._send_and_receive(self.SINGLE_MEASURE)
result = self._check_measurement_for_errors(result)
return result