-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAIOSerial.py
188 lines (163 loc) · 6.87 KB
/
AIOSerial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import serial
import asyncio as aio
import logging
# import the queue class
from .AIOExtensions.ClosableQueue import ClosableQueue, ClosableQueueClosed
# module logger
log = logging.getLogger(__name__)
# aio serial port exception
class AIOSerialException(Exception):
pass
# unable to open the port
class AIOSerialNotOpenException(AIOSerialException):
pass
# port is already closed, no communication will take place
class AIOSerialClosedException(AIOSerialException):
pass
# port fatal error
class AIOSerialErrorException(AIOSerialException):
pass
# serial port asyncio implementation
class AIOSerial:
# create the serial port
def __init__(self, *args, line_mode=False, **kwargs):
# this may fail due to port not being present
try:
# open the serial port connection
self.sp = serial.Serial(*args, **kwargs)
# port was not opened
if not self.sp.is_open:
raise AIOSerialException()
# re-raise the exception as AioSerialException
except (AIOSerialException, serial.SerialException) as e:
# log message
log.error("Error during port opening")
# re-raise exception
raise AIOSerialNotOpenException("Unable to open the port") from e
# are we working with the line mode? This will cause the read
# functionality to return full text lines which is often desired
# behavior for text protocols
self.line_mode = line_mode
# reception/transmission queue
self._rxq = ClosableQueue()
self._txq = ClosableQueue()
# get current event loop
self.loop = aio.get_running_loop()
# create receive and transmission tasks
self._rx_thread_fut = self.loop.run_in_executor(None, self._rx_thread)
self._tx_thread_fut = self.loop.run_in_executor(None, self._tx_thread)
# log information
log.info('Serial Port is now opened')
# called when entering 'async with' block
async def __aenter__(self):
# all was done in the constructor, so we can simply return the opened
# port
return self
# called when exiting 'async with' block
async def __aexit__(self, exc_type, exc_val, exc_tb):
# close the port
await self.close()
# close the serial port, do the cleanup
async def close(self):
# port is open?
if self.sp.is_open:
# cancel ongoing read-write operation to ensure that rx thread is
# not stuck inside the read() function
self.sp.cancel_read()
# close both queues
self._txq.close()
self._rxq.close()
# wait for the rx/tx thread to end, these need to be gathered to
# collect all the exceptions
await aio.gather(self._tx_thread_fut, self._rx_thread_fut)
# ensure that we call the close function no matter what
self.sp.close()
# log information
log.info('Serial Port is now closed')
# reception thread
def _rx_thread(self):
# get the proper read function according to mode
read_func = self.sp.readline if self.line_mode else self.sp.read
# putting into the rx queue may fail, read may fail as well
try:
# this loop can only be broken by exceptions
while True:
# read from the port
data = read_func()
# try putting the data to the queue, this will raise an
# exception when queue is closed which is in turn caused by the
# port itself getting closed. we use the result to
# raise the exception if any was thrown by the _rxq.put()
aio.run_coroutine_threadsafe(self._rxq.put(data),
self.loop).result()
# log information
log.debug(f'Serial Port RX Thread: {data}')
# queue closed exception raised? exit the loop gracefully (no
# exceptions) as this can only happen when the port is getting
# intentionally closed
except ClosableQueueClosed:
pass
# serial port exceptions, all of these notify that we are in some
# serious trouble
except serial.SerialException:
# log message
log.error('Serial Port RX error')
# create the exception of our own
e = AIOSerialErrorException("Serial Port Reception Error")
# close the queue
aio.run_coroutine_threadsafe(aio.coroutine(self._rxq.close)(e),
self.loop)
# log information
log.info('Serial Port RX Thread has ended')
# transmission thread
def _tx_thread(self):
# this may fail due to serial port or queue
try:
# this loop can only be broken by exceptions
while True:
# try getting data from the queue, this will raise an
# exception when queue is closed due to the fact that port is
# getting closed
data = aio.run_coroutine_threadsafe(self._txq.get(),
self.loop).result()
# write the data to the serial port
self.sp.write(data)
# log information
log.debug(f'Serial Port TX Thread: {data}')
# queue closed exception raised? exit the loop gracefully (no
# exceptions) as this can only happen when the port is getting
# intentionally closed
except ClosableQueueClosed:
pass
# serial port related exceptions
except serial.SerialException:
# log message
log.error('Serial Port TX error')
# create the exception of our own
e = AIOSerialErrorException("Serial Port Transmission Error")
# close the queue
aio.run_coroutine_threadsafe(aio.coroutine(self._txq.close)(e),
self.loop)
# log information
log.info('Serial Port RX Thread has ended')
# read from serial port
async def read(self):
# port might get closed
try:
# get data from the queue
return await self._rxq.get()
# closed queue means closed port
except ClosableQueueClosed:
raise AIOSerialClosedException("Serial Port is closed")
# write to serial port
async def write(self, data):
# unsupported type of data
if not isinstance(data, (bytes, bytearray)):
raise TypeError("Data must be of type bytes or bytearray")
# port might get closed
try:
# put data to port
await self._txq.put(data)
# closed queue means closed port
except ClosableQueueClosed:
raise AIOSerialClosedException("Serial Port is closed")