|
| 1 | +# |
| 2 | +# Copyright (C) 2020-2024 Embedded AMS B.V. - All Rights Reserved |
| 3 | +# |
| 4 | +# This file is part of Embedded Proto. |
| 5 | +# |
| 6 | +# Embedded Proto is open source software: you can redistribute it and/or |
| 7 | +# modify it under the terms of the GNU General Public License as published |
| 8 | +# by the Free Software Foundation, version 3 of the license. |
| 9 | +# |
| 10 | +# Embedded Proto is distributed in the hope that it will be useful, |
| 11 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | +# GNU General Public License for more details. |
| 14 | +# |
| 15 | +# You should have received a copy of the GNU General Public License |
| 16 | +# along with Embedded Proto. If not, see <https://www.gnu.org/licenses/>. |
| 17 | +# |
| 18 | +# For commercial and closed source application please visit: |
| 19 | +# <https://EmbeddedProto.com/license/>. |
| 20 | +# |
| 21 | +# Embedded AMS B.V. |
| 22 | +# Info: |
| 23 | +# info at EmbeddedProto dot com |
| 24 | +# |
| 25 | +# Postal address: |
| 26 | +# Atoomweg 2 |
| 27 | +# 1627 LE, Hoorn |
| 28 | +# the Netherlands |
| 29 | +# |
| 30 | + |
| 31 | +import argparse |
| 32 | +import can |
| 33 | +import subprocess |
| 34 | +import time |
| 35 | + |
| 36 | +from generated import netbooting_pb2 as pb |
| 37 | + |
| 38 | + |
| 39 | +DEF_CAN_INTERFACE = 'can0' |
| 40 | +DEF_CAN_BITRATE = 1000000 |
| 41 | +DEF_TARGET_CAN_ID = 0x801 |
| 42 | + |
| 43 | + |
| 44 | +class RemoteDeviceNotHookedError(Exception): |
| 45 | + def __init__(self, msg='Remote device is not hooked', *args, **kwargs): |
| 46 | + super().__init__(msg, *args, **kwargs) |
| 47 | + |
| 48 | + |
| 49 | +class CanNetBooting: |
| 50 | + PAYLOAD_SIZE = 8 |
| 51 | + FLASH_SECTOR_SIZE = 512 |
| 52 | + FLASH_WRITE_MAX_SIZE = FLASH_SECTOR_SIZE // 2 |
| 53 | + FLASH_AVAILABLE_SIZE = 0x10000 |
| 54 | + |
| 55 | + def __init__(self, interface: str, bitrate: int, remote_id: int): |
| 56 | + self._interface = interface |
| 57 | + self._bitrate = bitrate |
| 58 | + self._remote_id = remote_id |
| 59 | + self._bus = None |
| 60 | + self._hooked_up = False |
| 61 | + |
| 62 | + def __enter__(self): |
| 63 | + if not self._can_interface_is_up(): |
| 64 | + self._set_can_interface() |
| 65 | + |
| 66 | + self._bus = can.Bus(interface="socketcan", channel=self._interface, bitrate=self._bitrate) |
| 67 | + return self |
| 68 | + |
| 69 | + def __exit__(self, exc_type, exc_value, exc_tb): |
| 70 | + self._bus.shutdown() |
| 71 | + if exc_type is not None: |
| 72 | + raise exc_type(exc_value) |
| 73 | + return True |
| 74 | + |
| 75 | + def _receive_packet(self, timeout : float = 1.0): |
| 76 | + if self._bus is None: |
| 77 | + return |
| 78 | + |
| 79 | + msg = self._bus.recv(timeout) |
| 80 | + if msg is None: |
| 81 | + raise TimeoutError(f"No message received after '{timeout}' secs") |
| 82 | + #print(type(msg), msg) |
| 83 | + return msg |
| 84 | + |
| 85 | + def _send_packet(self, payload: bytes): |
| 86 | + if len(payload) > self.PAYLOAD_SIZE: |
| 87 | + raise ValueError(f"CAN can send up to {self.PAYLOAD_SIZE} bytes") |
| 88 | + |
| 89 | + msg = can.Message( |
| 90 | + arbitration_id=self._remote_id, |
| 91 | + data=payload, |
| 92 | + is_extended_id=True, |
| 93 | + is_remote_frame=False |
| 94 | + ) |
| 95 | + |
| 96 | + try: |
| 97 | + self._bus.send(msg) |
| 98 | + except can.CanError as e: |
| 99 | + raise e |
| 100 | + |
| 101 | + def _can_interface_is_up(self): |
| 102 | + # Run 'ip -details link show {interface}' to get CAN interface details |
| 103 | + result = subprocess.run(['ip', '-details', 'link', 'show', self._interface], capture_output=True, text=True) |
| 104 | + |
| 105 | + if result.returncode != 0: |
| 106 | + raise RuntimeError(f"CAN interface '{self._interface}' is not available or not up. Please connect the USB to can device") |
| 107 | + |
| 108 | + # Check if the output contains the bitrate and state |
| 109 | + output = result.stdout |
| 110 | + |
| 111 | + if 'state UP' in output and f'bitrate {self._bitrate}' in output: |
| 112 | + print(f"CAN self._interface '{self._interface}' is already up with bitrate {self._bitrate}.") |
| 113 | + return True |
| 114 | + else: |
| 115 | + print(f"CAN interface '{self._interface}' is either down or set with a different bitrate.") |
| 116 | + return False |
| 117 | + |
| 118 | + def _set_can_interface(self): |
| 119 | + try: |
| 120 | + print(f"Setting up the '{self._interface}' interface with bitrate '{self._bitrate}'") |
| 121 | + # Define the command to set the CAN interface |
| 122 | + command = ['sudo', 'ip', 'link', 'set', self._interface, 'up', 'type', 'can', 'bitrate', str(self._bitrate)] |
| 123 | + subprocess.run(command, check=True) |
| 124 | + print(f"CAN interface '{self._interface}' set up successfully.") |
| 125 | + except subprocess.CalledProcessError as e: |
| 126 | + print(f"Failed to set up CAN interface: {e}") |
| 127 | + raise e |
| 128 | + |
| 129 | + def _receive_reply(self, timeout: float = 1.0): |
| 130 | + data = bytes() |
| 131 | + start = time.time() |
| 132 | + |
| 133 | + while time.time() - start < timeout: |
| 134 | + msg = self._receive_packet() |
| 135 | + |
| 136 | + if msg.arbitration_id != self._remote_id: |
| 137 | + continue |
| 138 | + |
| 139 | + # Empty message means EOF |
| 140 | + if msg.dlc == 0: |
| 141 | + break |
| 142 | + |
| 143 | + data += msg.data |
| 144 | + |
| 145 | + reply = pb.Reply() |
| 146 | + reply.ParseFromString(data) |
| 147 | + return reply |
| 148 | + |
| 149 | + def _send_command(self, command: pb.Command, timeout: float = 1.0): |
| 150 | + serialized_command = command.SerializeToString() |
| 151 | + for i in range(0, len(serialized_command), self.PAYLOAD_SIZE): |
| 152 | + chunk = serialized_command[i:i + self.PAYLOAD_SIZE] |
| 153 | + self._send_packet(chunk) |
| 154 | + time.sleep(0.02) |
| 155 | + |
| 156 | + # Send an empty message as EOF |
| 157 | + self._send_packet(bytes([])) |
| 158 | + time.sleep(0.02) |
| 159 | + |
| 160 | + reply = self._receive_reply(timeout=timeout) |
| 161 | + |
| 162 | + if reply.status != pb.Reply.Status.Succeed: |
| 163 | + command_str = str(command).replace("\n", "").replace("\r", "") |
| 164 | + raise RuntimeError(f"Command {command_str} failed") |
| 165 | + elif reply.action != command.action: |
| 166 | + raise RuntimeError(f"Action missmatch, received {reply.action} while expecting {command.action}") |
| 167 | + |
| 168 | + return reply |
| 169 | + |
| 170 | + def hook_up(self, attempts: int = 10, timeout: float = 1.0): |
| 171 | + command = pb.Command() |
| 172 | + command.action = pb.Action.HookUp |
| 173 | + |
| 174 | + for i in range(attempts): |
| 175 | + try: |
| 176 | + _ = cnb._send_command(command, timeout=timeout) |
| 177 | + except TimeoutError: |
| 178 | + print(f"Timeout for hook up in the {i+1} attempt") |
| 179 | + continue |
| 180 | + except RuntimeError as e: |
| 181 | + print(e) |
| 182 | + continue |
| 183 | + |
| 184 | + self._hooked_up = True |
| 185 | + break |
| 186 | + else: |
| 187 | + raise TimeoutError(f"Couldn't hook up after {attempts} attempts") |
| 188 | + |
| 189 | + def quit(self, timeout: float = 1.0): |
| 190 | + if not self._hooked_up: |
| 191 | + raise RemoteDeviceNotHookedError() |
| 192 | + |
| 193 | + command = pb.Command() |
| 194 | + command.action = pb.Action.Quit |
| 195 | + _ = cnb._send_command(command, timeout=timeout) |
| 196 | + self._hooked_up = False |
| 197 | + |
| 198 | + def jump(self, timeout: float = 1.0): |
| 199 | + if not self._hooked_up: |
| 200 | + raise RemoteDeviceNotHookedError() |
| 201 | + |
| 202 | + command = pb.Command() |
| 203 | + command.action = pb.Action.Jump |
| 204 | + _ = cnb._send_command(command, timeout=timeout) |
| 205 | + self._hooked_up = False |
| 206 | + |
| 207 | + def erase_sector(self, address: int, timeout: float = 1.0): |
| 208 | + if not self._hooked_up: |
| 209 | + raise RemoteDeviceNotHookedError() |
| 210 | + |
| 211 | + if address % self.FLASH_SECTOR_SIZE: |
| 212 | + address = address // self.FLASH_SECTOR_SIZE |
| 213 | + |
| 214 | + print("erasing 0x{}".format(hex(address))) |
| 215 | + command = pb.Command() |
| 216 | + command.action = pb.Action.Erase |
| 217 | + command.address = address |
| 218 | + _ = cnb._send_command(command, timeout=timeout) |
| 219 | + |
| 220 | + def write(self, address: int, data: bytes, timeout: float = 1.0): |
| 221 | + if not self._hooked_up: |
| 222 | + raise RemoteDeviceNotHookedError() |
| 223 | + |
| 224 | + data_len = len(data) |
| 225 | + if data_len > self.FLASH_WRITE_MAX_SIZE: |
| 226 | + raise ValueError("Data too long ({data_len}), max_size is {self.FLASH_WRITE_MAX_SIZE}") |
| 227 | + |
| 228 | + print("writting 0x{}".format(hex(address))) |
| 229 | + command = pb.Command() |
| 230 | + command.action = pb.Action.Write |
| 231 | + command.address = address |
| 232 | + command.data.len = data_len |
| 233 | + command.data.buf = data |
| 234 | + _ = cnb._send_command(command, timeout=timeout) |
| 235 | + |
| 236 | + def flash_binary(self, binary: bytes): |
| 237 | + binary_len = len(binary) |
| 238 | + if binary_len > self.FLASH_AVAILABLE_SIZE: |
| 239 | + raise ValueError(f"Binary too long ({binary_len}), max size is {self.FLASH_AVAILABLE_SIZE}") |
| 240 | + |
| 241 | + for address in range(0, binary_len, self.FLASH_WRITE_MAX_SIZE): |
| 242 | + if address % self.FLASH_SECTOR_SIZE == 0: |
| 243 | + self.erase_sector(address) |
| 244 | + |
| 245 | + data = binary[address:address + self.FLASH_WRITE_MAX_SIZE] |
| 246 | + self.write(address, data) |
| 247 | + |
| 248 | + |
| 249 | +def commandline(): |
| 250 | + parser = argparse.ArgumentParser(description='Set up CAN interface with a specific bitrate.') |
| 251 | + |
| 252 | + parser.add_argument('binary', |
| 253 | + type=str, |
| 254 | + help="Path to the firmware binary") |
| 255 | + |
| 256 | + parser.add_argument('-i', '--interface', |
| 257 | + type=str, |
| 258 | + default=DEF_CAN_INTERFACE, |
| 259 | + help=f'CAN interface to use (default: {DEF_CAN_INTERFACE})') |
| 260 | + |
| 261 | + parser.add_argument('-b', '--bitrate', |
| 262 | + type=str, |
| 263 | + default=DEF_CAN_BITRATE, |
| 264 | + help=f'Bitrate to set on the CAN interface (default: {DEF_CAN_BITRATE})') |
| 265 | + |
| 266 | + parser.add_argument('-t', '--target-id', |
| 267 | + type=str, |
| 268 | + default=DEF_TARGET_CAN_ID, |
| 269 | + help=f'Target device CAN ID (default: {DEF_TARGET_CAN_ID})') |
| 270 | + |
| 271 | + return parser.parse_args() |
| 272 | + |
| 273 | + |
| 274 | +if __name__ == '__main__': |
| 275 | + args = commandline() |
| 276 | + |
| 277 | + with CanNetBooting(args.interface, args.bitrate, args.target_id) as cnb: |
| 278 | + print("Hooking up...") |
| 279 | + cnb.hook_up() |
| 280 | + print("Device hooked") |
| 281 | + |
| 282 | + try: |
| 283 | + with open(args.binary, 'rb') as f: |
| 284 | + binary_data = f.read() |
| 285 | + |
| 286 | + print(f"Starting to flash {args.binary} ({len(binary_data)} bytes)...") |
| 287 | + cnb.flash_binary(binary_data) |
| 288 | + print(f"{args.binary} Flashed") |
| 289 | + |
| 290 | + print("Jumping to the app...") |
| 291 | + cnb.jump() |
| 292 | + print("Bootloader jumped!") |
| 293 | + |
| 294 | + except KeyboardInterrupt: |
| 295 | + print("Quiting the net booting...") |
| 296 | + cnb.quit() |
| 297 | + print("Quit succeed") |
0 commit comments