-
Notifications
You must be signed in to change notification settings - Fork 11
/
meshtastic_utils.py
499 lines (433 loc) · 18.3 KB
/
meshtastic_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
import asyncio
import threading
import time
from typing import List
import meshtastic.ble_interface
import meshtastic.serial_interface
import meshtastic.tcp_interface
import serial # For serial port exceptions
import serial.tools.list_ports # Import serial tools for port listing
from bleak.exc import BleakDBusError, BleakError
from pubsub import pub
from config import relay_config
from db_utils import (
get_longname,
get_message_map_by_meshtastic_id,
get_shortname,
save_longname,
save_shortname,
)
from log_utils import get_logger
# Do not import plugin_loader here to avoid circular imports
# Extract matrix rooms configuration
matrix_rooms: List[dict] = relay_config["matrix_rooms"]
# Initialize logger for Meshtastic
logger = get_logger(name="Meshtastic")
# Global variables for the Meshtastic connection and event loop management
meshtastic_client = None
event_loop = None # Will be set from main.py
meshtastic_lock = (
threading.Lock()
) # To prevent race conditions on meshtastic_client access
reconnecting = False
shutting_down = False
reconnect_task = None # To keep track of the reconnect task
def serial_port_exists(port_name):
"""
Check if the specified serial port exists.
This prevents attempting connections on non-existent ports.
"""
ports = [port.device for port in serial.tools.list_ports.comports()]
return port_name in ports
def connect_meshtastic(force_connect=False):
"""
Establish a connection to the Meshtastic device.
Attempts a connection based on connection_type (serial/ble/network).
Retries until successful or shutting_down is set.
If already connected and not force_connect, returns the existing client.
"""
global meshtastic_client, shutting_down
if shutting_down:
logger.debug("Shutdown in progress. Not attempting to connect.")
return None
with meshtastic_lock:
if meshtastic_client and not force_connect:
return meshtastic_client
# Close previous connection if exists
if meshtastic_client:
try:
meshtastic_client.close()
except Exception as e:
logger.warning(f"Error closing previous connection: {e}")
meshtastic_client = None
# Determine connection type and attempt connection
connection_type = relay_config["meshtastic"]["connection_type"]
retry_limit = 0 # 0 means infinite retries
attempts = 1
successful = False
while (
not successful
and (retry_limit == 0 or attempts <= retry_limit)
and not shutting_down
):
try:
if connection_type == "serial":
# Serial connection
serial_port = relay_config["meshtastic"]["serial_port"]
logger.info(f"Connecting to serial port {serial_port} ...")
# Check if serial port exists before connecting
if not serial_port_exists(serial_port):
logger.warning(
f"Serial port {serial_port} does not exist. Waiting..."
)
time.sleep(5)
attempts += 1
continue
meshtastic_client = meshtastic.serial_interface.SerialInterface(
serial_port
)
elif connection_type == "ble":
# BLE connection
ble_address = relay_config["meshtastic"].get("ble_address")
if ble_address:
logger.info(f"Connecting to BLE address {ble_address} ...")
meshtastic_client = meshtastic.ble_interface.BLEInterface(
address=ble_address,
noProto=False,
debugOut=None,
noNodes=False,
)
else:
logger.error("No BLE address provided.")
return None
else:
# Network (TCP) connection
target_host = relay_config["meshtastic"]["host"]
logger.info(f"Connecting to host {target_host} ...")
meshtastic_client = meshtastic.tcp_interface.TCPInterface(
hostname=target_host
)
successful = True
nodeInfo = meshtastic_client.getMyNodeInfo()
logger.info(
f"Connected to {nodeInfo['user']['shortName']} / {nodeInfo['user']['hwModel']}"
)
# Subscribe to message and connection lost events
pub.subscribe(on_meshtastic_message, "meshtastic.receive")
pub.subscribe(
on_lost_meshtastic_connection, "meshtastic.connection.lost"
)
except (
serial.SerialException,
BleakDBusError,
BleakError,
Exception,
) as e:
if shutting_down:
logger.debug("Shutdown in progress. Aborting connection attempts.")
break
attempts += 1
if retry_limit == 0 or attempts <= retry_limit:
wait_time = min(
attempts * 2, 30
) # Exponential backoff capped at 30s
logger.warning(
f"Attempt #{attempts - 1} failed. Retrying in {wait_time} secs: {e}"
)
time.sleep(wait_time)
else:
logger.error(f"Could not connect after {retry_limit} attempts: {e}")
return None
return meshtastic_client
def on_lost_meshtastic_connection(interface=None):
"""
Callback invoked when the Meshtastic connection is lost.
Initiates a reconnect sequence unless shutting_down is True.
"""
global meshtastic_client, reconnecting, shutting_down, event_loop, reconnect_task
with meshtastic_lock:
if shutting_down:
logger.debug("Shutdown in progress. Not attempting to reconnect.")
return
if reconnecting:
logger.info(
"Reconnection already in progress. Skipping additional reconnection attempt."
)
return
reconnecting = True
logger.error("Lost connection. Reconnecting...")
if meshtastic_client:
try:
meshtastic_client.close()
except OSError as e:
if e.errno == 9:
# Bad file descriptor, already closed
pass
else:
logger.warning(f"Error closing Meshtastic client: {e}")
except Exception as e:
logger.warning(f"Error closing Meshtastic client: {e}")
meshtastic_client = None
if event_loop:
reconnect_task = asyncio.run_coroutine_threadsafe(reconnect(), event_loop)
async def reconnect():
"""
Asynchronously attempts to reconnect with exponential backoff.
Stops if shutting_down is set.
"""
global meshtastic_client, reconnecting, shutting_down
backoff_time = 10
try:
while not shutting_down:
try:
logger.info(
f"Reconnection attempt starting in {backoff_time} seconds..."
)
await asyncio.sleep(backoff_time)
if shutting_down:
logger.debug(
"Shutdown in progress. Aborting reconnection attempts."
)
break
meshtastic_client = connect_meshtastic(force_connect=True)
if meshtastic_client:
logger.info("Reconnected successfully.")
break
except Exception as e:
if shutting_down:
break
logger.error(f"Reconnection attempt failed: {e}")
backoff_time = min(backoff_time * 2, 300) # Cap backoff at 5 minutes
except asyncio.CancelledError:
logger.info("Reconnection task was cancelled.")
finally:
reconnecting = False
def on_meshtastic_message(packet, interface):
"""
Handle incoming Meshtastic messages. For reaction messages, if relay_reactions is False,
we do not store message maps and thus won't be able to relay reactions back to Matrix.
If relay_reactions is True, message maps are stored inside matrix_relay().
"""
# Apply reaction filtering based on config
relay_reactions = relay_config["meshtastic"].get("relay_reactions", False)
# If relay_reactions is False, filter out reaction/tapback packets to avoid complexity
if packet.get("decoded", {}).get("portnum") == "TEXT_MESSAGE_APP":
decoded = packet.get("decoded", {})
if not relay_reactions and ("emoji" in decoded or "replyId" in decoded):
logger.debug(
"Filtered out reaction/tapback packet due to relay_reactions=false."
)
return
from matrix_utils import matrix_relay
global event_loop
if shutting_down:
logger.debug("Shutdown in progress. Ignoring incoming messages.")
return
if event_loop is None:
logger.error("Event loop is not set. Cannot process message.")
return
loop = event_loop
sender = packet.get("fromId") or packet.get("from")
toId = packet.get("to")
decoded = packet.get("decoded", {})
text = decoded.get("text")
replyId = decoded.get("replyId")
emoji_flag = "emoji" in decoded and decoded["emoji"] == 1
# Determine if this is a direct message to the relay node
from meshtastic.mesh_interface import BROADCAST_NUM
myId = interface.myInfo.my_node_num
if toId == myId:
is_direct_message = True
elif toId == BROADCAST_NUM:
is_direct_message = False
else:
# Message to someone else; ignoring for broadcasting logic
is_direct_message = False
meshnet_name = relay_config["meshtastic"]["meshnet_name"]
# Reaction handling (Meshtastic -> Matrix)
# If replyId and emoji_flag are present and relay_reactions is True, we relay as text reactions in Matrix
if replyId and emoji_flag and relay_reactions:
longname = get_longname(sender) or str(sender)
shortname = get_shortname(sender) or str(sender)
orig = get_message_map_by_meshtastic_id(replyId)
if orig:
# orig = (matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet)
matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet = orig
abbreviated_text = (
meshtastic_text[:40] + "..."
if len(meshtastic_text) > 40
else meshtastic_text
)
# Ensure that meshnet_name is always included, using our own meshnet for accuracy.
full_display_name = f"{longname}/{meshnet_name}"
reaction_symbol = text.strip() if (text and text.strip()) else "⚠️"
reaction_message = f'\n [{full_display_name}] reacted {reaction_symbol} to "{abbreviated_text}"'
# Relay the reaction as emote to Matrix, preserving the original meshnet name
asyncio.run_coroutine_threadsafe(
matrix_relay(
matrix_room_id,
reaction_message,
longname,
shortname,
meshnet_name,
decoded.get("portnum"),
meshtastic_id=packet.get("id"),
meshtastic_replyId=replyId,
meshtastic_text=meshtastic_text,
emote=True,
emoji=True,
),
loop=loop,
)
else:
logger.debug("Original message for reaction not found in DB.")
return
# Normal text messages or detection sensor messages
if text:
# Determine the channel for this message
channel = packet.get("channel")
if channel is None:
# If channel not specified, deduce from portnum
if (
decoded.get("portnum") == "TEXT_MESSAGE_APP"
or decoded.get("portnum") == 1
):
channel = 0
elif decoded.get("portnum") == "DETECTION_SENSOR_APP":
channel = 0
else:
logger.debug(
f"Unknown portnum {decoded.get('portnum')}, cannot determine channel"
)
return
# Check if channel is mapped to a Matrix room
channel_mapped = False
for room in matrix_rooms:
if room["meshtastic_channel"] == channel:
channel_mapped = True
break
if not channel_mapped:
logger.debug(f"Skipping message from unmapped channel {channel}")
return
# If detection_sensor is disabled and this is a detection sensor packet, skip it
if decoded.get("portnum") == "DETECTION_SENSOR_APP" and not relay_config[
"meshtastic"
].get("detection_sensor", False):
logger.debug(
"Detection sensor packet received, but detection sensor processing is disabled."
)
return
# Attempt to get longname/shortname from database or nodes
longname = get_longname(sender)
shortname = get_shortname(sender)
if not longname or not shortname:
node = interface.nodes.get(sender)
if node:
user = node.get("user")
if user:
if not longname:
longname = user.get("longName")
if longname:
save_longname(sender, longname)
if not shortname:
shortname = user.get("shortName")
if shortname:
save_shortname(sender, shortname)
else:
logger.debug(f"Node info for sender {sender} not available yet.")
# If still not available, fallback to sender ID
if not longname:
longname = str(sender)
if not shortname:
shortname = str(sender)
formatted_message = f"[{longname}/{meshnet_name}]: {text}"
# Plugin functionality - Check if any plugin handles this message before relaying
from plugin_loader import load_plugins
plugins = load_plugins()
found_matching_plugin = False
for plugin in plugins:
if not found_matching_plugin:
result = asyncio.run_coroutine_threadsafe(
plugin.handle_meshtastic_message(
packet, formatted_message, longname, meshnet_name
),
loop=loop,
)
found_matching_plugin = result.result()
if found_matching_plugin:
logger.debug(f"Processed by plugin {plugin.plugin_name}")
# If message is a DM or handled by plugin, do not relay further
if is_direct_message:
logger.debug(
f"Received a direct message from {longname}. Not relaying to Matrix."
)
return
if found_matching_plugin:
logger.debug("Message was handled by a plugin. Not relaying to Matrix.")
return
# Relay the message to all Matrix rooms mapped to this channel
logger.info(
f"Processing inbound radio message from {sender} on channel {channel}"
)
logger.info(f"Relaying Meshtastic message from {longname} to Matrix")
for room in matrix_rooms:
if room["meshtastic_channel"] == channel:
# Storing the message_map (if enabled) occurs inside matrix_relay() now,
# controlled by relay_reactions.
asyncio.run_coroutine_threadsafe(
matrix_relay(
room["id"],
formatted_message,
longname,
shortname,
meshnet_name,
decoded.get("portnum"),
meshtastic_id=packet.get("id"),
meshtastic_text=text,
),
loop=loop,
)
else:
# Non-text messages via plugins
portnum = decoded.get("portnum")
from plugin_loader import load_plugins
plugins = load_plugins()
found_matching_plugin = False
for plugin in plugins:
if not found_matching_plugin:
result = asyncio.run_coroutine_threadsafe(
plugin.handle_meshtastic_message(
packet,
formatted_message=None,
longname=None,
meshnet_name=None,
),
loop=loop,
)
found_matching_plugin = result.result()
if found_matching_plugin:
logger.debug(
f"Processed {portnum} with plugin {plugin.plugin_name}"
)
async def check_connection():
"""
Periodically checks the Meshtastic connection by sending a ping.
If an error occurs, it attempts to reconnect.
"""
global meshtastic_client, shutting_down
connection_type = relay_config["meshtastic"]["connection_type"]
while not shutting_down:
if meshtastic_client:
try:
meshtastic_client.sendPing()
except Exception as e:
logger.error(f"{connection_type.capitalize()} connection lost: {e}")
on_lost_meshtastic_connection(meshtastic_client)
await asyncio.sleep(5) # Check connection every 5 seconds
if __name__ == "__main__":
# If running this standalone (normally the main.py does the loop), just try connecting and run forever.
meshtastic_client = connect_meshtastic()
loop = asyncio.get_event_loop()
event_loop = loop # Set the event loop for use in callbacks
loop.create_task(check_connection())
loop.run_forever()