forked from micropython/micropython
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathSerial.c
274 lines (241 loc) · 11.8 KB
/
Serial.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// This file is part of the CircuitPython project: https://circuitpython.org
//
// SPDX-FileCopyrightText: 2025 rianadon
//
// SPDX-License-Identifier: MIT
#include "py/stream.h"
#include "py/objproperty.h"
#include "py/runtime.h"
#include "py/stream.h"
#include "shared-bindings/usb/cdc_host/Serial.h"
#include "tusb.h"
#include "class/cdc/cdc_host.h"
//| class Serial:
//| """Receives cdc commands over USB"""
//|
//| def __init__(self, device: Device) -> None:
//| """You cannot create an instance of `usb_cdc.Serial`.
//| The available instances are in the ``usb_cdc.serials`` tuple."""
//| ...
//|
//| def read(self, size: int = -1) -> bytes:
//| """Read at most ``size`` bytes. If ``size`` exceeds the internal buffer size,
//| only the bytes in the buffer will be read. If ``size`` is not specified or is ``-1``,
//| read as many bytes as possible, until the timeout expires.
//| If `timeout` is > 0 or ``None``, and fewer than ``size`` bytes are available,
//| keep waiting until the timeout expires or ``size`` bytes are available.
//|
//| If no bytes are read, return ``b''``. This is unlike, say, `busio.UART.read()`, which
//| would return ``None``.
//|
//| :return: Data read
//| :rtype: bytes"""
//| ...
//|
//| def readinto(self, buf: WriteableBuffer) -> int:
//| """Read bytes into the ``buf``. Read at most ``len(buf)`` bytes. If `timeout`
//| is > 0 or ``None``, keep waiting until the timeout expires or ``len(buf)``
//| bytes are available.
//|
//| :return: number of bytes read and stored into ``buf``
//| :rtype: int"""
//| ...
//|
//| def readline(self, size: int = -1) -> Optional[bytes]:
//| r"""Read a line ending in a newline character ("\\n"), including the newline.
//| Return everything readable if no newline is found and ``timeout`` is 0.
//| Return ``None`` in case of error.
//|
//| This is a binary stream: the newline character "\\n" cannot be changed.
//| If the host computer transmits "\\r" it will also be included as part of the line.
//|
//| :param int size: maximum number of characters to read. ``-1`` means as many as possible.
//| :return: the line read
//| :rtype: bytes or None"""
//| ...
//|
//| def readlines(self) -> List[Optional[bytes]]:
//| """Read multiple lines as a list, using `readline()`.
//|
//| .. warning:: If ``timeout`` is ``None``,
//| `readlines()` will never return, because there is no way to indicate end of stream.
//|
//| :return: a list of the line read
//| :rtype: list"""
//| ...
//|
//| def write(self, buf: ReadableBuffer) -> int:
//| """Write as many bytes as possible from the buffer of bytes.
//|
//| :return: the number of bytes written
//| :rtype: int"""
//| ...
//|
//| def flush(self) -> None:
//| """Force out any unwritten bytes, waiting until they are written."""
//| ...
//|
static mp_uint_t usb_host_cdc_serial_read_stream(mp_obj_t self_in, void *buf_in, mp_uint_t size, int *errcode) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
byte *buf = buf_in;
if (size == 0) {
return 0;
}
return common_hal_usb_host_cdc_serial_read(self, buf, size, errcode);
}
static mp_uint_t usb_host_cdc_serial_write_stream(mp_obj_t self_in, const void *buf_in, mp_uint_t size, int *errcode) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
const byte *buf = buf_in;
return common_hal_usb_host_cdc_serial_write(self, buf, size, errcode);
}
static mp_uint_t usb_host_cdc_serial_ioctl_stream(mp_obj_t self_in, mp_uint_t request, mp_uint_t arg, int *errcode) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_uint_t ret = 0;
switch (request) {
case MP_STREAM_POLL: {
mp_uint_t flags = arg;
ret = 0;
if ((flags & MP_STREAM_POLL_RD) && common_hal_usb_host_cdc_serial_get_in_waiting(self) > 0) {
ret |= MP_STREAM_POLL_RD;
}
if ((flags & MP_STREAM_POLL_WR) && common_hal_usb_host_cdc_serial_get_out_waiting(self) < CFG_TUH_CDC_TX_BUFSIZE) {
ret |= MP_STREAM_POLL_WR;
}
break;
}
case MP_STREAM_FLUSH:
common_hal_usb_host_cdc_serial_flush(self);
ret = 0;
break;
default:
*errcode = MP_EINVAL;
ret = MP_STREAM_ERROR;
}
return ret;
}
// connected property
//| connected: bool
//| """True if this Serial object represents a mounted CDC device
//| and the remote device is asserting DTR (Data Terminal Ready). (read-only)
//| """
static mp_obj_t usb_host_cdc_serial_get_connected(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
return mp_obj_new_bool(common_hal_usb_host_cdc_serial_get_connected(self));
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_get_connected_obj, usb_host_cdc_serial_get_connected);
MP_PROPERTY_GETTER(usb_host_cdc_serial_connected_obj,
(mp_obj_t)&usb_host_cdc_serial_get_connected_obj);
// in_waiting property
//| in_waiting: int
//| """Returns the number of bytes waiting to be read from the
//| CDC device's input buffer. (read-only)"""
static mp_obj_t usb_host_cdc_serial_get_in_waiting(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
return mp_obj_new_int(common_hal_usb_host_cdc_serial_get_in_waiting(self));
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_get_in_waiting_obj, usb_host_cdc_serial_get_in_waiting);
MP_PROPERTY_GETTER(usb_host_cdc_serial_in_waiting_obj,
(mp_obj_t)&usb_host_cdc_serial_get_in_waiting_obj);
// out_waiting property
//| out_waiting: int
//| """Returns the number of bytes waiting to be written to the
//| CDC device's output buffer. (read-only)"""
static mp_obj_t usb_host_cdc_serial_get_out_waiting(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
return mp_obj_new_int(common_hal_usb_host_cdc_serial_get_out_waiting(self));
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_get_out_waiting_obj, usb_host_cdc_serial_get_out_waiting);
MP_PROPERTY_GETTER(usb_host_cdc_serial_out_waiting_obj,
(mp_obj_t)&usb_host_cdc_serial_get_out_waiting_obj);
// reset_input_buffer method
//| def reset_input_buffer(self) -> None:
//| """Clears any unread bytes from the input buffer."""
//| ...
static mp_obj_t usb_host_cdc_serial_reset_input_buffer(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
common_hal_usb_host_cdc_serial_reset_input_buffer(self);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_reset_input_buffer_obj, usb_host_cdc_serial_reset_input_buffer);
// reset_output_buffer method
//| def reset_output_buffer(self) -> None:
//| """Clears any unwritten bytes from the output buffer."""
//| ...
static mp_obj_t usb_host_cdc_serial_reset_output_buffer(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
common_hal_usb_host_cdc_serial_reset_output_buffer(self);
return mp_const_none; // Standard method returns None
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_reset_output_buffer_obj, usb_host_cdc_serial_reset_output_buffer);
// timeout property
//| timeout: Optional[float]
//| """The read timeout value in seconds. `None means wait indefinitely.//| 0 means non-blocking. Positive value is the timeout in seconds."""
static mp_obj_t usb_host_cdc_serial_get_timeout(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_float_t timeout = common_hal_usb_host_cdc_serial_get_timeout(self);
return (timeout < 0.0f) ? mp_const_none : mp_obj_new_float(timeout);
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_get_timeout_obj, usb_host_cdc_serial_get_timeout);
static mp_obj_t usb_host_cdc_serial_set_timeout(mp_obj_t self_in, mp_obj_t timeout_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
common_hal_usb_host_cdc_serial_set_timeout(self,
timeout_in == mp_const_none ? -1.0f : mp_obj_get_float(timeout_in));
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_2(usb_host_cdc_serial_set_timeout_obj, usb_host_cdc_serial_set_timeout);
MP_PROPERTY_GETSET(usb_host_cdc_serial_timeout_obj,
(mp_obj_t)&usb_host_cdc_serial_get_timeout_obj,
(mp_obj_t)&usb_host_cdc_serial_set_timeout_obj);
// write_timeout property
//| write_timeout: Optional[float]
//| """The write timeout value in seconds. `None means wait indefinitely.//| 0 means non-blocking. Positive value is the timeout in seconds."""
static mp_obj_t usb_host_cdc_serial_get_write_timeout(mp_obj_t self_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_float_t write_timeout = common_hal_usb_host_cdc_serial_get_write_timeout(self);
return (write_timeout < 0.0f) ? mp_const_none : mp_obj_new_float(write_timeout);
}
MP_DEFINE_CONST_FUN_OBJ_1(usb_host_cdc_serial_get_write_timeout_obj, usb_host_cdc_serial_get_write_timeout);
static mp_obj_t usb_host_cdc_serial_set_write_timeout(mp_obj_t self_in, mp_obj_t write_timeout_in) {
usb_cdc_host_serial_obj_t *self = MP_OBJ_TO_PTR(self_in);
common_hal_usb_host_cdc_serial_set_write_timeout(self,
write_timeout_in == mp_const_none ? -1.0f : mp_obj_get_float(write_timeout_in));
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_2(usb_host_cdc_serial_set_write_timeout_obj, usb_host_cdc_serial_set_write_timeout);
MP_PROPERTY_GETSET(usb_host_cdc_serial_write_timeout_obj,
(mp_obj_t)&usb_host_cdc_serial_get_write_timeout_obj,
(mp_obj_t)&usb_host_cdc_serial_set_write_timeout_obj);
static const mp_rom_map_elem_t usb_host_cdc_serial_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_flush), MP_ROM_PTR(&mp_stream_flush_obj) },
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&mp_stream_read_obj) },
{ MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&mp_stream_readinto_obj) },
{ MP_ROM_QSTR(MP_QSTR_readline), MP_ROM_PTR(&mp_stream_unbuffered_readline_obj)},
{ MP_ROM_QSTR(MP_QSTR_readlines), MP_ROM_PTR(&mp_stream_unbuffered_readlines_obj)},
{ MP_ROM_QSTR(MP_QSTR_write), MP_ROM_PTR(&mp_stream_write_obj) },
{ MP_ROM_QSTR(MP_QSTR_in_waiting), MP_ROM_PTR(&usb_host_cdc_serial_in_waiting_obj) },
{ MP_ROM_QSTR(MP_QSTR_out_waiting), MP_ROM_PTR(&usb_host_cdc_serial_out_waiting_obj) },
{ MP_ROM_QSTR(MP_QSTR_reset_input_buffer), MP_ROM_PTR(&usb_host_cdc_serial_reset_input_buffer_obj) },
{ MP_ROM_QSTR(MP_QSTR_reset_output_buffer), MP_ROM_PTR(&usb_host_cdc_serial_reset_output_buffer_obj) },
{ MP_ROM_QSTR(MP_QSTR_timeout), MP_ROM_PTR(&usb_host_cdc_serial_timeout_obj) },
{ MP_ROM_QSTR(MP_QSTR_write_timeout), MP_ROM_PTR(&usb_host_cdc_serial_write_timeout_obj) },
{ MP_ROM_QSTR(MP_QSTR_connected), MP_ROM_PTR(&usb_host_cdc_serial_connected_obj) },
// TODO: Add baudrate, data_bits, parity, stop_bits properties/methods.
};
static MP_DEFINE_CONST_DICT(usb_host_cdc_serial_locals_dict, usb_host_cdc_serial_locals_dict_table);
static const mp_stream_p_t usb_host_cdc_serial_stream_p = {
.read = usb_host_cdc_serial_read_stream,
.write = usb_host_cdc_serial_write_stream,
.ioctl = usb_host_cdc_serial_ioctl_stream,
.is_text = false,
.pyserial_read_compatibility = true,
.pyserial_readinto_compatibility = true,
.pyserial_dont_return_none_compatibility = true,
};
MP_DEFINE_CONST_OBJ_TYPE(
usb_cdc_host_serial_type,
MP_QSTR_Serial,
MP_TYPE_FLAG_ITER_IS_ITERNEXT | MP_TYPE_FLAG_HAS_SPECIAL_ACCESSORS,
locals_dict, &usb_host_cdc_serial_locals_dict,
iter, mp_stream_unbuffered_iter,
protocol, &usb_host_cdc_serial_stream_p
);