-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbluetoothcli
executable file
·108 lines (83 loc) · 3.86 KB
/
bluetoothcli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
"""
Bluetooth CLI
-------------
An application that scans and connects to a Bluetooth Device advertising NUS.
"""
import asyncio
import sys
import termios
import tty
from itertools import count, takewhile
from typing import Iterator
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
# TIP: you can get this function and more from the ``more-itertools`` package.
def sliced(data: bytes, n: int) -> Iterator[bytes]:
"""
Slices *data* into chunks of size *n*. The last slice may be smaller than
*n*.
"""
return takewhile(len, (data[i : i + n] for i in count(0, n)))
async def uart_terminal():
"""This is a simple "terminal" program that uses the Nordic Semiconductor
(nRF) UART service. It reads from stdin and sends each line of data to the
remote device. Any data received from the device is printed to stdout.
"""
def match_nus_uuid(device: BLEDevice, adv: AdvertisementData):
# This assumes that the device includes the UART service UUID in the
# advertising data. This test may need to be adjusted depending on the
# actual advertising data supplied by the device.
if UART_SERVICE_UUID.lower() in adv.service_uuids:
return True
return False
device = await BleakScanner.find_device_by_filter(match_nus_uuid)
if device is None:
print("no matching device found, you may need to edit match_nus_uuid().")
sys.exit(1)
def handle_disconnect(_: BleakClient):
print("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def handle_rx(_: BleakGATTCharacteristic, data: bytearray):
print(data.decode("utf-8"), end="", flush=True)
async with BleakClient(device, disconnected_callback=handle_disconnect) as client:
await client.start_notify(UART_TX_CHAR_UUID, handle_rx)
print("Connected, start typing and press ENTER...")
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
fd_settings = termios.tcgetattr(fd)
tty.setraw(fd)
nus = client.services.get_service(UART_SERVICE_UUID)
rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
while True:
# This waits until you type a line and press ENTER.
# A real terminal program might put stdin in raw mode so that things
# like CTRL+C get passed to the remote device.
data = await loop.run_in_executor(None, sys.stdin.buffer.read, 1)
# Exit on CTRL+D
if data[0] == 0x04:
break
# some devices, like devices running MicroPython, expect Windows
# line endings (uncomment line below if needed)
# data = data.replace(b"\n", b"\r\n")
# Writing without response requires that the data can fit in a
# single BLE packet. We can use the max_write_without_response_size
# property to split the data into chunks that will fit.
# TODO: Buffer data until new-line or special character is received.
for s in sliced(data, rx_char.max_write_without_response_size):
await client.write_gatt_char(rx_char, s, response=False)
# Restore stdin settings back to normal
termios.tcsetattr(fd, termios.TCSADRAIN, fd_settings)
if __name__ == "__main__":
try:
asyncio.run(uart_terminal())
except asyncio.CancelledError:
# task is cancelled on disconnect, so we ignore this error
pass