forked from ivanfmartinez/juicepassproxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
juicebox_mitm.py
374 lines (324 loc) · 17.3 KB
/
juicebox_mitm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import asyncio
import errno
import logging
import time
import asyncio_dgram
from const import (
ERROR_LOOKBACK_MIN,
MAX_ERROR_COUNT,
MAX_RETRY_ATTEMPT,
MITM_HANDLER_TIMEOUT,
MITM_RECV_TIMEOUT,
MITM_SEND_DATA_TIMEOUT,
)
from juicebox_message import JuiceboxCommand, JuiceboxStatusMessage, JuiceboxEncryptedMessage, JuiceboxDebugMessage, juicebox_message_from_bytes
# Began with https://github.com/rsc-dev/pyproxy and rewrote when moving to async.
_LOGGER = logging.getLogger(__name__)
class JuiceboxMITM:
def __init__(
self,
jpp_addr,
enelx_addr,
local_mitm_handler=None,
ignore_enelx=False,
remote_mitm_handler=None,
mqtt_handler=None,
loglevel=None,
reuse_port=True,
):
if loglevel is not None:
_LOGGER.setLevel(loglevel)
self._jpp_addr = jpp_addr
self._enelx_addr = enelx_addr
self._juicebox_addr = None
self._ignore_enelx = ignore_enelx
self._local_mitm_handler = local_mitm_handler
self._remote_mitm_handler = remote_mitm_handler
self._mqtt_handler = mqtt_handler
self._reuse_port = reuse_port
self._loop = asyncio.get_running_loop()
self._mitm_loop_task: asyncio.Task = None
self._sending_lock = asyncio.Lock()
self._dgram = None
self._error_count = 0
self._error_timestamp_list = []
# Last command sent to juicebox device
self._last_command = None
# Last message received from juicebox device
self._last_status_message = None
self._first_status_message_timestamp = None
self._boot_timestamp = None
async def start(self) -> None:
_LOGGER.info(f"Starting JuiceboxMITM at {self._jpp_addr[0]}:{self._jpp_addr[1]} reuse_port={self._reuse_port}")
_LOGGER.debug(f"EnelX: {self._enelx_addr[0]}:{self._enelx_addr[1]}")
await self._connect()
async def close(self):
if self._dgram is not None:
self._dgram.close()
self._dgram = None
await asyncio.sleep(3)
async def _connect(self):
connect_attempt = 1
while (
self._dgram is None
and connect_attempt <= MAX_RETRY_ATTEMPT
and self._error_count < MAX_ERROR_COUNT
):
if connect_attempt != 1:
_LOGGER.debug(
"Retrying UDP Server Startup. Attempt "
f"{connect_attempt} of {MAX_RETRY_ATTEMPT}"
)
connect_attempt += 1
try:
if self._sending_lock.locked():
self._dgram = await asyncio_dgram.bind(
self._jpp_addr, reuse_port=self._reuse_port
)
else:
async with self._sending_lock:
self._dgram = await asyncio_dgram.bind(
self._jpp_addr, reuse_port=self._reuse_port
)
except OSError as e:
_LOGGER.warning(
"JuiceboxMITM UDP Server Startup Error. Reconnecting. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
self._dgram = None
pass
await asyncio.sleep(5)
if self._dgram is None:
raise ChildProcessError("JuiceboxMITM: Unable to start MITM UDP Server.")
if self._mitm_loop_task is None or self._mitm_loop_task.done():
self._mitm_loop_task = await self._mitm_loop()
self._loop.create_task(self._mitm_loop_task)
_LOGGER.debug(f"JuiceboxMITM Connected. {self._jpp_addr}")
async def _mitm_loop(self) -> None:
_LOGGER.debug("Starting JuiceboxMITM Loop")
while self._error_count < MAX_ERROR_COUNT:
if self._dgram is None:
_LOGGER.warning("JuiceboxMITM Reconnecting.")
await self._add_error()
await self._connect()
continue
# _LOGGER.debug("Listening")
try:
async with asyncio.timeout(MITM_RECV_TIMEOUT):
data, remote_addr = await self._dgram.recv()
except asyncio_dgram.TransportClosed:
_LOGGER.warning("JuiceboxMITM Connection Lost.")
await self._add_error()
self._dgram = None
continue
except TimeoutError as e:
_LOGGER.warning(
f"No Message Received after {MITM_RECV_TIMEOUT} sec. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
self._dgram = None
continue
try:
async with asyncio.timeout(MITM_HANDLER_TIMEOUT):
await self._main_mitm_handler(data, remote_addr)
except TimeoutError as e:
_LOGGER.warning(
f"MITM Handler timeout after {MITM_HANDLER_TIMEOUT} sec. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
self._dgram = None
raise ChildProcessError(
f"JuiceboxMITM: More than {self._error_count} errors in the last "
f"{ERROR_LOOKBACK_MIN} min."
)
def _booted_in_less_than(self, seconds):
return self._boot_timestamp and ((time.time() - self._boot_timestamp) < seconds)
async def _message_decode(self, data : bytes):
decoded_message = None
try:
decoded_message = juicebox_message_from_bytes(data)
if isinstance(decoded_message, JuiceboxEncryptedMessage):
# encrypted are not supported now
# directory server can set the encripted mode, to disable the JuiceBox must be blocked to access the Directory Server
_LOGGER.error("Encrypted messages are not supported yet, please restart yout Juicebox device without internet connection to be able to use unencrypted messages")
elif isinstance(decoded_message, JuiceboxStatusMessage):
self._last_status_message = decoded_message
if self._first_status_message_timestamp is None:
self._first_status_message_timestamp = time.time()
elapsed = int(time.time() - self._first_status_message_timestamp)
# Try to initialize the set entities with safe values from the juicebox device
# This is not the best way to do, but can be made without need to store somewhere the data as config is not available here
# TODO: better/safer way
if not self.is_mqtt_numeric_entity_defined("current_max_online_set"):
if decoded_message.has_value("current_max_online"):
_LOGGER.info("setting current_max_online_set with current_max_online")
await self._mqtt_handler.get_entity("current_max_online_set").set_state(self._last_status_message.get_processed_value("current_max_online"))
# Apparently all messages came with current_max_online then, this code will never be executed
elif ((elapsed > 600) or self._booted_in_less_than(30)) and decoded_message.has_value("current_rating"):
_LOGGER.info("setting current_max_online_set with current_rating")
await self._mqtt_handler.get_entity("current_max_online_set").set_state(self._last_status_message.get_processed_value("current_rating"))
#TODO now the MQTT is storing previous data on config, this can be used to get initialize theses values from previous JPP execution
if not self.is_mqtt_numeric_entity_defined("current_max_offline_set"):
if decoded_message.has_value("current_max_offline"):
_LOGGER.info("setting current_max_offline_set with current_max_offline")
await self._mqtt_handler.get_entity("current_max_offline_set").set_state(self._last_status_message.get_processed_value("current_max_offline"))
# After a reboot of device, the device that does not send offline will start with online value defined with offline setting
# as the device will start to use the offline current after 5 minutes without responses from server, we can consider that after this time
# we got the offline value from the online parameter, use the parameter after 6 minutes from first status message
elif (self._booted_in_less_than(30) or (elapsed > 6*60) ) and decoded_message.has_value("current_max_online"):
_LOGGER.info(f"setting current_max_offline_set with current_max_online after reboot or more than 5 minutes (elapsed={elapsed})")
await self._mqtt_handler.get_entity("current_max_offline_set").set_state(self._last_status_message.get_processed_value("current_max_online"))
#TODO we still have a problem on v07 protocol that does not send the current_max_offline
# the entity will not be updated
elif isinstance(decoded_message, JuiceboxDebugMessage):
if decoded_message.is_boot():
self._boot_timestamp = time.time()
else:
_LOGGER.exception(f"Unexpected juicebox message type {decoded_message}")
except Exception as e:
_LOGGER.exception(f"Not a valid juicebox message |{data}| {e}")
return decoded_message
async def _main_mitm_handler(self, data: bytes, from_addr: tuple[str, int]):
if data is None or from_addr is None:
return
# _LOGGER.debug(f"JuiceboxMITM Recv: {data} from {from_addr}")
if from_addr[0] != self._enelx_addr[0]:
self._juicebox_addr = from_addr
if from_addr == self._juicebox_addr:
# Must decode message to give correct command response based on version
# Also this decoded message can will passed to the mqtt handler to skip a new decoding
decoded_message = await self._message_decode(data)
data = await self._local_mitm_handler(data, decoded_message)
if self._ignore_enelx:
# Keep sending responses to local juicebox like the enelx servers using last values
# the responses should be send only to valid JuiceboxStatusMessages
if isinstance(decoded_message, JuiceboxStatusMessage):
await self.send_cmd_message_to_juicebox(new_values=False)
else:
try:
await self.send_data(data, self._enelx_addr)
except OSError as e:
_LOGGER.warning(
f"JuiceboxMITM OSError {errno.errorcode[e.errno]} "
f"[{self._enelx_addr}]: {e}"
)
await self._local_mitm_handler(
f"JuiceboxMITM_OSERROR|server|{self._enelx_addr}|"
f"{errno.errorcode[e.errno]}|{e}"
)
await self._add_error()
elif self._juicebox_addr is not None and from_addr == self._enelx_addr:
if not self._ignore_enelx:
data = await self._remote_mitm_handler(data)
try:
await self.send_data(data, self._juicebox_addr)
except OSError as e:
_LOGGER.warning(
f"JuiceboxMITM OSError {errno.errorcode[e.errno]} "
f"[{self._juicebox_addr}]: {e}"
)
await self._local_mitm_handler(
f"JuiceboxMITM_OSERROR|client|{self._juicebox_addr}|"
f"{errno.errorcode[e.errno]}|{e}"
)
await self._add_error()
else:
_LOGGER.info(f"JuiceboxMITM Ignoring From EnelX: {data}")
else:
_LOGGER.warning(f"JuiceboxMITM Unknown address: {from_addr}")
async def send_data(
self, data: bytes, to_addr: tuple[str, int], blocking_time: int = 0.1
):
sent = False
send_attempt = 1
while not sent and send_attempt <= MAX_RETRY_ATTEMPT:
if send_attempt != 1:
_LOGGER.warning(
f"JuiceboxMITM Resending (Attempt: {send_attempt} of "
f"{MAX_RETRY_ATTEMPT}): {data} to {to_addr}"
)
send_attempt += 1
if self._dgram is None:
_LOGGER.warning("JuiceboxMITM Reconnecting.")
await self._connect()
try:
async with asyncio.timeout(MITM_SEND_DATA_TIMEOUT):
async with self._sending_lock:
try:
await self._dgram.send(data, to_addr)
except asyncio_dgram.TransportClosed:
_LOGGER.warning(
"JuiceboxMITM Connection Lost while Sending."
)
await self._add_error()
self._dgram = None
else:
sent = True
except TimeoutError as e:
_LOGGER.warning(
f"Send Data timeout after {MITM_SEND_DATA_TIMEOUT} sec. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await asyncio.sleep(max(blocking_time, 0.1))
if not sent:
raise ChildProcessError("JuiceboxMITM: Unable to send data.")
# _LOGGER.debug(f"JuiceboxMITM Sent: {data} to {to_addr}")
async def send_data_to_juicebox(self, data: bytes):
await self.send_data(data, self._juicebox_addr)
def is_mqtt_numeric_entity_defined(self, entity_name):
entity = self._mqtt_handler.get_entity(entity_name)
# TODO: not clear why sometimes "0" came at this point as string instead of numeric
# Using same way on HA dashboard sometimes came 0.0 float and sometimes "0" str
# _LOGGER.debug(f"is_mqtt_entity_defined {entity_name} {entity} {entity.state}")
defined = entity and (isinstance(entity.state, int | float) or (isinstance(entity.state, str) and entity.state.isnumeric()))
return defined
async def __build_cmd_message(self, new_values):
if type(self._last_status_message) is JuiceboxEncryptedMessage:
_LOGGER.info("Responses for encrypted protocol not supported yet")
return None
# TODO: check which other versions can be considered as new_version of protocol
# packet captures indicate that v07 uses old version
new_version = self._last_status_message and (self._last_status_message.get_value("v") == "09u")
if self._last_command:
message = JuiceboxCommand(previous=self._last_command, new_version=new_version)
else:
message = JuiceboxCommand(new_version=new_version)
# Should start with values
new_values = True
if new_values:
if (not self.is_mqtt_numeric_entity_defined("current_max_offline_set")) or (not self.is_mqtt_numeric_entity_defined("current_max_online_set")):
_LOGGER.error("Must have both current_max(online|offline) defined to send command message")
return None
message.offline_amperage = int(self._mqtt_handler.get_entity("current_max_offline_set").state)
message.instant_amperage = int(self._mqtt_handler.get_entity("current_max_online_set").state)
_LOGGER.info(f"command message = {message} new_values={new_values} new_version={new_version}")
self._last_command = message;
return message.build()
# Send a new message using values on mqtt entities
async def send_cmd_message_to_juicebox(self, new_values):
if not self._ignore_enelx:
_LOGGER.warning("To send commands to juicebox you have to ignore ENEL X servers, please set ignore_enelx option")
elif self._mqtt_handler.get_entity("act_as_server").is_on():
cmd_message = await self.__build_cmd_message(new_values)
if cmd_message:
_LOGGER.info(f"Sending command to juicebox {cmd_message} new_values={new_values}")
await self.send_data(cmd_message.encode('utf-8'), self._juicebox_addr)
async def set_mqtt_handler(self, mqtt_handler):
self._mqtt_handler = mqtt_handler
async def set_local_mitm_handler(self, local_mitm_handler):
self._local_mitm_handler = local_mitm_handler
async def set_remote_mitm_handler(self, remote_mitm_handler):
self._remote_mitm_handler = remote_mitm_handler
async def _add_error(self):
self._error_timestamp_list.append(time.time())
time_cutoff = time.time() - (ERROR_LOOKBACK_MIN * 60)
temp_list = list(
filter(lambda el: el > time_cutoff, self._error_timestamp_list)
)
self._error_timestamp_list = temp_list
self._error_count = len(self._error_timestamp_list)
_LOGGER.debug(f"Errors in last {ERROR_LOOKBACK_MIN} min: {self._error_count}")