-
Notifications
You must be signed in to change notification settings - Fork 11
/
shelly_firmware.py
executable file
·306 lines (268 loc) · 11.4 KB
/
shelly_firmware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/env python3
import io
import sh
import os
import re
import sys
import json
import tempfile
import hashlib
import zipfile
import argparse
import requests
import logging
VERSION = '0.1'
cloud_url = 'http://api.shelly.cloud/files/firmware'
flash_size = 2097152
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
tool_mkspiffs = sh.Command('./tools/mkspiffs8')
tool_unspiffs = sh.Command('./tools/unspiffs8')
def list_dev_from_cloud():
try:
logger.debug("Fetching data from URL: {}".format(cloud_url))
cloud_resp = requests.get(cloud_url)
except Exception as err:
logger.exception('An error occurred while fetching device list:' % err)
logger.debug('Got response {} for URL: {}'.format(cloud_resp.status_code, cloud_url))
cloud_json = cloud_resp.json()
if 'isok' in cloud_json and cloud_json['isok']:
logger.debug('Data JSON received and it looks sane. isok = True')
return cloud_json['data']
def print_devices(data, beta):
print('#'*56)
print("The following devices were found in Shelly cloud\n")
print("{0:<16}{1:<40}".format("Model", "Release"))
print('='*56)
for model, info in data.items():
try:
if beta: print("{0:<16}{1:<40}".format(model, info["beta_ver"]))
else: print("{0:<16}{1:<40}".format(model, info["version"]))
except KeyError:
logger.debug("No firmware verion available for model {}".format(model))
print('#' * 56)
def get_firmware_url(data, model):
try:
dev_info = data[model]
except KeyError:
logger.exception("Model {} not found in Shelly cloud".format(model))
logger.debug("Model {} found!".format(model))
return dev_info["url"]
def build_firmware(input_data, output_file):
fw_zip = zipfile.ZipFile(io.BytesIO(input_data))
manifest = fw_get_manifest(fw_zip)
try:
platform_name = manifest['name']
except KeyError:
logger.exception("Platform name not found in firmware package")
logger.info("Found platform {} in firmware package".format(platform_name))
part_list = []
logger.debug('Iterating over firmware parts...')
for key, part in manifest['parts'].items():
start_addr = part["addr"]
part_size = part["size"]
if not any([x in part.keys() for x in ("fill","src")]):
logger.error('Data missing for part {}.'.format(key))
exit(1)
if "fill" in part:
part_data = bytearray([part["fill"]] * part_size)
if "src" in part:
part_data = fw_get_part(fw_zip, part["src"])
if "cs_sha1" in part:
if not fw_verify_part(part_data, part["cs_sha1"]):
logger.error("Verification failed. Invalid data for part {}".format(key))
exit(1)
logger.debug('Found part {}:\n'.format(key) +
'\tStart address: {}\n'.format(hex(int(start_addr))) +
'\tSize: {}\n'.format(hex(int(part_size))) +
'\tData: {}...'.format(''.join(format(x, '02x') for x in part_data[:32])))
if 'fs' in key:
logger.debug('Found SPIFFS data partition of size: {}'.format(part_size))
part_data = fs_inject_hwinfo(part_data, platform_name)
part_size = len(part_data)
logger.debug('New SPIFFS data partition size: {}'.format(part_size))
part_list.append({
'start': start_addr,
'size': part_size,
'data': part_data
})
empty_image = create_flash_image(flash_size)
flash_image = io.BytesIO(empty_image)
for part in part_list:
logger.debug('Writing {} bytes at address {}...'.format(part['size'],
hex(int(part['start']))))
flash_image.seek(part['start'])
flash_image.write(part['data'])
with open(output_file, "wb") as outfile:
logger.info('Writing file {}'.format(output_file))
outfile.write(flash_image.getbuffer())
def download_and_build_firmware(url, output_file):
try:
fw_pkg = requests.get(url)
except Exception as err:
logger.exception("An error occurred while fetching firmware:" % err)
build_firmware(fw_pkg.content, output_file)
def build_firmware_from_file(input_file, output_file):
try:
file_contents=open(input_file,"rb").read()
except Exception as err:
logger.exception("An error occurred while reading input:" % err)
build_firmware(file_contents, output_file)
def fs_inject_hwinfo(data, name):
# This will edit SPIFFS filesystem and inject hwinfo
temp_dir = tempfile.mkdtemp()
fs_dir = os.path.join(temp_dir, 'out')
fs_old = os.path.join(temp_dir, 'old.bin')
fs_new = os.path.join(temp_dir, 'new.bin')
os.mkdir(fs_dir)
logger.debug('Created temporary directory {} for unpacking SPIFFS data'.format(fs_dir))
with open(fs_old, 'wb') as f:
f.write(data)
f.flush()
# Unpack SPIFFS
cmd = tool_unspiffs('-d', fs_dir,
fs_old)
if cmd.exit_code:
logger.error('SPIFFS unpacking failed! Cannot unpack!' +
'\n\tCommand output:\n\t{}'.format(cmd.stdout.decode(sys.stdout.encoding)) +
'\n\tError message:\n\t{}'.format(cmd.stderr.decode(sys.stderr.encoding)))
exit(1)
logger.debug('SPIFFS unpack success!' +
'\n\tCommand output\n\t{}'.format(cmd.stdout.decode(sys.stdout.encoding)) +
'\n\t{}'.format(cmd.stderr.decode(sys.stderr.encoding)))
# unspiffs tool prints fs info in stderr during unpack
cmd_info = cmd.stderr.decode(sys.stderr.encoding)
# File size
fs_fs = re.search(r'\(.*fs\s(\d+).*\)', cmd_info).group(1)
# Block size
fs_bs = re.search(r'\(.*bs\s(\d+).*\)', cmd_info).group(1)
# Page size
fs_ps = re.search(r'\(.*ps\s(\d+).*\)', cmd_info).group(1)
# Erase size
fs_es = re.search(r'\(.*es\s(\d+).*\)', cmd_info).group(1)
hwinfo = mk_hwinfo_for_platform(name)
logger.debug('Created hwinfo struct:' +
'\n\t{}'.format(hwinfo))
with open(os.path.join(fs_dir, 'hwinfo_struct.json'), 'w') as f:
f.write(hwinfo)
f.flush()
# Repack SPIFFS
cmd = tool_mkspiffs('-s', fs_fs,
'-b', fs_bs,
'-p', fs_ps,
'-e', fs_es,
'-f', fs_new,
fs_dir)
if cmd.exit_code:
logger.error('SPIFFS repacking failed! Cannot create SPIFFS!' +
'\n\tCommand output:\n\t{}'.format(cmd.stdout.decode(sys.stdout.encoding)) +
'\n\tError message:\n\t{}'.format(cmd.stderr.decode(sys.stderr.encoding)))
exit(1)
logger.debug('SPIFFS repack success!' +
'\n\tCommand output\n\t{}'.format(cmd.stdout.decode(sys.stdout.encoding)) +
'\n\t{}'.format(cmd.stderr.decode(sys.stderr.encoding)))
with open(fs_new, 'rb') as f:
new_data = bytearray(f.read())
return new_data
def mk_hwinfo_for_platform(name):
hwinfo = {
"selftest": True,
"hwinfo_ver": 1,
"batch_id": 1,
"model": name,
"hw_revision": "prod-unknown",
"manufacturer": "device_recovery"
}
return json.dumps(hwinfo)
def fw_get_manifest(fw_zip):
firmware_files = fw_zip.namelist()
logger.debug('The following files were found in downloaded firmware package'
'\n\t{}'.format('\n\t'.join(firmware_files)))
manifest_name = next(file for file in firmware_files if "manifest" in file)
logger.debug('The manifest seems to be named {}'.format(manifest_name))
if not manifest_name:
logger.error("Manifest file was not found in firmware package!")
exit(1)
manifest_file = fw_zip.read(manifest_name)
try:
manifest = json.loads(manifest_file)
except json.JSONDecodeError:
logger.exception("Cannot decode JSON. Bad manifest format!")
return manifest
def fw_get_part(fw_zip, part):
logger.debug('Searching for part {} in firmware package'.format(part))
firmware_files = fw_zip.namelist()
logger.debug('The following files were found in downloaded firmware package'
'\n\t{}'.format('\n\t'.join(firmware_files)))
part_name = next(file for file in firmware_files if part in file)
logger.debug('The file for part {} seems to be named {}'.format(part, part_name))
if part_name:
part_data = bytearray(fw_zip.read(part_name))
else:
logger.error("Error occurred trying to read data for part {}".format(part))
exit(1)
return part_data
def fw_verify_part(data, chksum):
logger.debug('Part data verification requested')
algo = hashlib.sha1()
algo.update(data)
digest = algo.hexdigest()
logger.debug('The following checksums were calculated:\n' +
'\tData\t\t{}\n'.format(digest) +
'\tManifest\t{}'.format(chksum))
if chksum == digest:
logger.debug('Checksums match. Success!')
return True
return False
def create_flash_image(size):
logger.debug('Generating empty flash image of {} bytes'.format(size))
return bytearray([255] * size)
def main():
# Select action based on arguments
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--list", action="store_true",
help="List available devices from shelly.cloud")
parser.add_argument("-b", "--beta", action="store_true",
help="List beta versions from shelly.cloud")
parser.add_argument("-d", "--download", dest="model",
help="Download binary for specified device")
parser.add_argument("-i", "--input", dest="input_file",
help="Use the provided .zip file as input, instead of downloading.")
parser.add_argument("-o", "--output", default="firmware.bin",
help="Output file name")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug output to console")
args = parser.parse_args()
# Logging config
console_handler = logging.StreamHandler()
if args.verbose:
console_handler.setLevel(logging.DEBUG)
else:
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter('%(levelname)s:\t%(message)s')
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)
file_handler = logging.FileHandler('shelly_firmware.log')
file_handler.setLevel(logging.DEBUG)
file_format = logging.Formatter('%(asctime)s\t[%(levelname)s]: %(message)s')
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
logger.info('Shelly firmware binary download tool. Version {}'.format(VERSION))
if args.list:
logger.info('Getting list of available firmware packages from shelly.cloud')
dev_list = list_dev_from_cloud()
print_devices(dev_list, args.beta)
exit(0)
if args.model:
logger.info('Downloading firmware binary file for device {}'.format(args.model))
logger.info('Output file is set to: {}'.format(args.output))
dev_list = list_dev_from_cloud()
firmware_url = get_firmware_url(dev_list, args.model)
download_and_build_firmware(firmware_url, args.output)
exit(0)
if args.input_file:
build_firmware_from_file(args.input_file, args.output)
exit(0)
parser.print_help()
if __name__ == "__main__":
main()