forked from ianepperson/midnight_museum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.py
146 lines (110 loc) · 3.79 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from threading import Lock
import logging
from tasmota import Light, Discover
import paho.mqtt.client as mqtt
log = logging.getLogger(__name__)
GLOBAL_HANDLER = None
HANDLER_LOCK = Lock()
def get_mqtt_handler(setup):
'''Get the singleton of the handler'''
global GLOBAL_HANDLER
if HANDLER_LOCK.acquire(timeout=0.1):
if not GLOBAL_HANDLER:
GLOBAL_HANDLER = MQTTHandler(setup)
return GLOBAL_HANDLER
class MQTTHandler:
def __init__(self, setup):
self.setup = setup
self._lights = {}
self._started = Lock()
self._userdata = None
self.client = mqtt.Client()
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.discover = Discover(self.client)
self.discover.on_new_device = self._on_new_device
# for creating callbacks
self.on_connect = None
self.on_message = None
self.status = 'Disconnected'
def user_data_set(self, value):
self._userdata = value
@property
def started(self):
return self._started.locked()
@property
def host(self):
return self.setup.mqtt_host
@property
def port(self):
return self.setup.mqtt_port
def update_connection_settings(self, host, port):
log.info(f'Updating connection settings to {host=} {port=}')
self.setup.mqtt_host = host
self.setup.mqtt_port = port
self.setup.save()
self.restart()
def publish(self, *args, **kwargs):
self.client.publish(*args, **kwargs)
def start(self):
if not self._started.acquire(blocking=False):
log.info('MQTT server already started')
return
log.info(f'Connecting to MQTT server at {self.host}:{self.port}')
self.status = 'Connecting...'
try:
self.client.connect(self.host, self.port)
except Exception as e:
self.status = e
else:
self.client.loop_start()
def restart(self):
log.info(f'Restarting MQTT connection')
self.status = 'Restarting...'
self.client.disconnect()
self.status = 'Disconnected, reconnecting...'
self._started.release()
self.start()
def stop(self):
self.client.loop_stop()
self._started.release()
def _on_connect(self, client, userdata, flags, rc):
# subscribe to all channels
log.info('MQTT Connected. Listening for any changes')
self.status = 'Connected'
client.subscribe('#')
def _on_message(self, client, userdata, msg):
# All status messages start with "stat/"
# topic="stat/tasmota_197CD7/POWER1" payload="ON"
log.debug(f'MQTT Received {msg.topic} :: {msg.payload}')
if not msg or not msg.topic or not msg.topic.startswith('stat/'):
return
if self.on_message is not None:
self.on_message(client, userdata, msg)
try:
prefix, topic, command = msg.topic.split('/')
except ValueError:
# If we don't have the expected count of values
return
if not userdata:
return
instance = userdata.get(topic)
if not instance:
return
def _on_new_device(self, device):
log.info(f'new device {device.sn}')
def get_light(self, light_id) -> Light:
'''Get a light by its ID.
Returns a tasmota.Light object.
'''
if not light_id:
return None
device = self.discover.devices.get(light_id)
if not device:
return None
# device.topic
if light_id not in self._lights:
self._lights[light_id] = Light(
mqtt_client=self, topic=device.topic
)
return self._lights[light_id]