-
Notifications
You must be signed in to change notification settings - Fork 54
/
update_jade_fw.py
executable file
·280 lines (231 loc) · 10.5 KB
/
update_jade_fw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/env python
import os
import sys
import time
import json
import hashlib
import logging
import argparse
import requests
from jadepy import JadeAPI, JadeError
from tools import fwtools
FWSERVER_URL_ROOT = 'https://jadefw.blockstream.com/bin'
FWSERVER_INDEX_FILE = 'index.json'
# Enable jade logging
jadehandler = logging.StreamHandler()
logger = logging.getLogger('jadepy.jade')
logger.setLevel(logging.DEBUG)
logger.addHandler(jadehandler)
device_logger = logging.getLogger('jadepy.jade-device')
device_logger.setLevel(logging.DEBUG)
device_logger.addHandler(jadehandler)
# Parse the index file and select firmware to download
def get_fw_metadata(verinfo, release_data):
# Select firmware from list of available
def _full_fw_label(fw):
return f'{fw["version"]} - {fw["config"]}'
def _delta_fw_label(fw, width):
return _full_fw_label(fw).ljust(width) + f'FROM {fw["from_version"]} - {fw["from_config"]}'
def _delta_appropriate(fw):
return fw['from_version'] == verinfo['JADE_VERSION'] and \
fw['from_config'].lower() == verinfo['JADE_CONFIG'].lower()
print(f'Current Jade fw: {verinfo["JADE_VERSION"]} - {verinfo["JADE_CONFIG"].lower()}')
print('-')
i = 0
print('Delta patches (faster)')
deltas = list(filter(_delta_appropriate, release_data.get('delta', [])))
just = max(len(name) for name in map(_full_fw_label, deltas)) + 2 if deltas else 0
for i, label in enumerate((_delta_fw_label(fw, just) for fw in deltas), i + 1): # 1 based index
print(f'{i})'.ljust(4), label)
print('-')
print('Full firmware images')
fullfws = release_data.get('full', [])
for i, label in enumerate((_full_fw_label(fw) for fw in fullfws), i + 1): # continue numbering
print(f'{i})'.ljust(4), label)
print('-')
numdeltas = len(deltas)
while True:
selectedfw = input('Select firmware: ')
if selectedfw.isdigit():
selectedfw = int(selectedfw)
if selectedfw > 0 and selectedfw <= i:
selectedfw -= 1 # zero-based index
if selectedfw < numdeltas:
# delta firmware
selectedfw = deltas[selectedfw]
else:
selectedfw -= numdeltas
selectedfw = fullfws[selectedfw]
return selectedfw
# Download compressed firmware file from Firmware Server using 'requests'
def download_file(verinfo, release):
# Workout hw_target subdir
board_type = verinfo.get("BOARD_TYPE")
features = verinfo.get("JADE_FEATURES")
hw_target = {'JADE': 'jade',
'JADE_V1.1': 'jade1.1',
'JADE_V2': 'jade2.0'}.get(board_type if board_type else 'JADE')
build_type = {'SB': '', 'DEV': 'dev'}.get(features)
if hw_target is None or build_type is None:
logger.error(f'Unsupported hardware: {board_type} / {features}')
return None, None, None, None
hw_target += build_type
# GET the index file from the firmware server which lists the
# available firmwares
url = f'{FWSERVER_URL_ROOT}/{hw_target}/{FWSERVER_INDEX_FILE}'
logger.info(f'Downloading firmware index file {url}')
rslt = requests.get(url)
assert rslt.status_code == 200, f'Cannot download index file {url}: {rslt.status_code}'
# Get the filename of the firmware to download
release_data = json.loads(rslt.text).get(release)
if not release_data:
logger.warning(f'No suitable firmware for tag: {release}')
return None, None, None, None
fwdata = get_fw_metadata(verinfo, release_data)
fwname = fwdata['filename']
fwhash = fwdata.get('fwhash')
cmphash = fwdata.get('cmphash')
# GET the selected firmware from the server
url = f'{FWSERVER_URL_ROOT}/{hw_target}/{fwname}'
logger.info(f'Downloading firmware {url}')
rslt = requests.get(f'{FWSERVER_URL_ROOT}/{hw_target}/{fwname}')
assert rslt.status_code == 200, f'Cannot download firmware file {url}: {rslt.status_code}'
fwcmp = rslt.content
logger.info(f'Downloaded {len(fwcmp)} byte firmware')
# Check the downloaded file hash if available
if cmphash:
# Compute the sha256 hash of the downloaded file
cmphasher = hashlib.sha256()
cmphasher.update(fwcmp)
assert cmphasher.digest() == bytes.fromhex(cmphash)
logger.info(f'Downloaded file hash verified')
# Optionally save the file locally
write_file = input('Save local copy of downloaded firmware? [y/N]').strip()
if write_file == 'y' or write_file == 'Y':
fwtools.write(fwcmp, os.path.basename(fwname))
if fwhash:
fwtools.write(fwhash, os.path.basename(fwname) + ".hash", text=True)
# Return
return fwdata['fwsize'], fwdata.get('patch_size'), fwhash, fwcmp
# Use a local (previously downloaded) firmware file.
# Must have the name unchanged from download.
# Handles full firmwares and also compressed firmware patches.
def get_local_fwfile(fwfilename):
# Load the firmware file
assert os.path.exists(fwfilename) and os.path.isfile(
fwfilename), f'Compressed firmware file not found: {fwfilename}'
# Read the fw file
fwcmp = fwtools.read(fwfilename)
fwhash = None
try:
fwhash = fwtools.read(fwfilename + ".hash", text=True)
except Exception as e:
logger.warning('Hash file no present or not valid')
# Use fwtools to parse the filename and deduce whether this is
# a full firmware file or a firmware delta/patch.
fwtype, fwinfo, fwinfo2 = fwtools.parse_compressed_filename(fwfilename)
assert (fwtype == fwtools.FWFILE_TYPE_PATCH) == (fwinfo2 is not None)
return fwinfo.fwsize, fwinfo2.fwsize if fwinfo2 else None, fwhash, fwcmp
# Takes the compressed firmware data to upload, the expected length of the
# final (uncompressed) firmware, the length of the uncompressed diff/patch
# (if this is a patch to apply to the current running firmware), and whether
# to apply the test mnemonic rather than using normal pinserver authentication.
def ota(jade, verinfo, fwcompressed, fwlength, fwhash, patchlen=None):
logger.info(f'Running OTA on: {verinfo}')
chunksize = int(verinfo['JADE_OTA_MAX_CHUNK'])
assert chunksize > 0
if verinfo.get('JADE_STATE') not in ['READY', 'UNINIT']:
# The network to use is deduced from the version-info
print('Please ensure Jade is unlocked')
network = 'testnet' if verinfo.get('JADE_NETWORKS') == 'TEST' else 'mainnet'
while not jade.auth_user(network, epoch=int(time.time())):
print('Please try again')
start_time = time.time()
last_time = start_time
last_written = 0
# Callback to log progress
def _log_progress(written, compressed_size, extended_reply):
nonlocal last_time
nonlocal last_written
assert extended_reply is None
current_time = time.time()
secs = current_time - last_time
total_secs = current_time - start_time
bytes_ = written - last_written
last_rate = bytes_ / secs
avg_rate = written / total_secs
progress = (written / compressed_size) * 100
secs_remaining = (compressed_size - written) / avg_rate
template = '{0:.2f} b/s - progress {1:.2f}% - {2:.2f} seconds left'
print(template.format(last_rate, progress, secs_remaining))
print('Written {0}b in {1:.2f}s'.format(written, total_secs))
last_time = current_time
last_written = written
print('Please approve the firmware update on the Jade device')
try:
result = jade.ota_update(fwcompressed, fwlength, chunksize, fwhash,
patchlen=patchlen, cb=_log_progress, extended_replies=False)
assert result is True
print(f'Total OTA time: {time.time() - start_time}s')
except JadeError as err:
logger.error(f'OTA failed or abandoned: {err}')
print('OTA incomplete')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--serialport',
action='store',
dest='serialport',
help='Serial port or device',
default=None)
srcgrp = parser.add_mutually_exclusive_group()
srcgrp.add_argument('--release',
action='store',
dest='release',
choices=['previous', 'stable', 'beta'],
help='Use previous or beta versions, if available. Defaults to stable.',
default=None)
srcgrp.add_argument('--fwfile',
action='store',
dest='fwfile',
help='Local (previously downloaded) file to OTA - full or patch',
default=None)
parser.add_argument('--log',
action='store',
dest='loglevel',
help='Jade logging level',
choices=['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'],
default='ERROR')
args = parser.parse_args()
jadehandler.setLevel(getattr(logging, args.loglevel))
logger.debug(f'args: {args}')
# Connect to Jade over serial
with JadeAPI.create_serial(device=args.serialport) as jade:
# Get the version info
verinfo = jade.get_version_info()
# Get the file to OTA
if args.fwfile:
# Can't check that local file is appropriate for connected hw
# OTA should reject/fail if not appropriate.
# File must have the name unchanged from download.
fwlen, patchlen, fwhash, fwcmp = get_local_fwfile(args.fwfile)
else:
# File download should only offer appropriate fw
# OTA should reject/fail if not appropriate.
release = args.release or 'stable' # defaults to latest/stable
fwlen, patchlen, fwhash, fwcmp = download_file(verinfo, release)
if fwcmp is None:
print('No firmware available')
sys.exit(2)
if fwhash is not None:
logger.info(f'Final fw hash: {fwhash}')
fwhash = bytes.fromhex(fwhash)
print(f'Got fw {"patch" if patchlen else "file"} of length {len(fwcmp)} '
f'with expected uncompressed final fw length {fwlen}')
# OTA file to connected Jade hw
upload = input('Upload fw to connected Jade [Y/n]').strip()
if upload == 'y' or upload == 'Y' or upload == '':
logger.info('Jade OTA over serial')
with JadeAPI.create_serial(device=args.serialport) as jade:
ota(jade, verinfo, fwcmp, fwlen, fwhash, patchlen)
else:
logger.info('Skipping upload')