|
| 1 | +# python library to interface with panda |
| 2 | +import os |
| 3 | +import struct |
| 4 | +from functools import wraps |
| 5 | +from typing import Optional |
| 6 | + |
| 7 | +from panda import Panda, PandaDFU |
| 8 | +from panda.python.constants import McuType |
| 9 | + |
| 10 | +BASEDIR = os.path.dirname(os.path.realpath(__file__)) |
| 11 | +FW_PATH = os.path.join(BASEDIR, "obj/") |
| 12 | + |
| 13 | + |
| 14 | +def ensure_jungle_health_packet_version(fn): |
| 15 | + @wraps(fn) |
| 16 | + def wrapper(self, *args, **kwargs): |
| 17 | + if self.health_version != self.HEALTH_PACKET_VERSION: |
| 18 | + raise RuntimeError(f"Jungle firmware ({self.health_version}) doesn't match the \ |
| 19 | + library's health packet version ({self.HEALTH_PACKET_VERSION}). \ |
| 20 | + Reflash jungle.") |
| 21 | + return fn(self, *args, **kwargs) |
| 22 | + return wrapper |
| 23 | + |
| 24 | + |
| 25 | +class PandaJungleDFU(PandaDFU): |
| 26 | + def recover(self): |
| 27 | + fn = os.path.join(FW_PATH, self._mcu_type.config.bootstub_fn.replace("panda", "panda_jungle")) |
| 28 | + with open(fn, "rb") as f: |
| 29 | + code = f.read() |
| 30 | + self.program_bootstub(code) |
| 31 | + self.reset() |
| 32 | + |
| 33 | + |
| 34 | +class PandaJungle(Panda): |
| 35 | + USB_PIDS = (0xddef, 0xddcf) |
| 36 | + |
| 37 | + HW_TYPE_UNKNOWN = b'\x00' |
| 38 | + HW_TYPE_V1 = b'\x01' |
| 39 | + HW_TYPE_V2 = b'\x02' |
| 40 | + |
| 41 | + F4_DEVICES = [HW_TYPE_V1, ] |
| 42 | + H7_DEVICES = [HW_TYPE_V2, ] |
| 43 | + |
| 44 | + HEALTH_PACKET_VERSION = 1 |
| 45 | + HEALTH_STRUCT = struct.Struct("<IffffffHHHHHHHHHHHH") |
| 46 | + |
| 47 | + HARNESS_ORIENTATION_NONE = 0 |
| 48 | + HARNESS_ORIENTATION_1 = 1 |
| 49 | + HARNESS_ORIENTATION_2 = 2 |
| 50 | + |
| 51 | + def flash(self, fn=None, code=None, reconnect=True): |
| 52 | + if not fn: |
| 53 | + fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn.replace("panda", "panda_jungle")) |
| 54 | + super().flash(fn=fn, code=code, reconnect=reconnect) |
| 55 | + |
| 56 | + def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool: |
| 57 | + dfu_serial = self.get_dfu_serial() |
| 58 | + |
| 59 | + if reset: |
| 60 | + self.reset(enter_bootstub=True) |
| 61 | + self.reset(enter_bootloader=True) |
| 62 | + |
| 63 | + if not self.wait_for_dfu(dfu_serial, timeout=timeout): |
| 64 | + return False |
| 65 | + |
| 66 | + dfu = PandaJungleDFU(dfu_serial) |
| 67 | + dfu.recover() |
| 68 | + |
| 69 | + # reflash after recover |
| 70 | + self.connect(True, True) |
| 71 | + self.flash() |
| 72 | + return True |
| 73 | + |
| 74 | + def get_mcu_type(self) -> McuType: |
| 75 | + hw_type = self.get_type() |
| 76 | + if hw_type in PandaJungle.F4_DEVICES: |
| 77 | + return McuType.F4 |
| 78 | + elif hw_type in PandaJungle.H7_DEVICES: |
| 79 | + return McuType.H7 |
| 80 | + raise ValueError(f"unknown HW type: {hw_type}") |
| 81 | + |
| 82 | + # ******************* health ******************* |
| 83 | + |
| 84 | + @ensure_jungle_health_packet_version |
| 85 | + def health(self): |
| 86 | + dat = self._handle.controlRead(PandaJungle.REQUEST_IN, 0xd2, 0, 0, self.HEALTH_STRUCT.size) |
| 87 | + a = self.HEALTH_STRUCT.unpack(dat) |
| 88 | + return { |
| 89 | + "uptime": a[0], |
| 90 | + "ch1_power": a[1], |
| 91 | + "ch2_power": a[2], |
| 92 | + "ch3_power": a[3], |
| 93 | + "ch4_power": a[4], |
| 94 | + "ch5_power": a[5], |
| 95 | + "ch6_power": a[6], |
| 96 | + "ch1_sbu1_voltage": a[7] / 1000.0, |
| 97 | + "ch1_sbu2_voltage": a[8] / 1000.0, |
| 98 | + "ch2_sbu1_voltage": a[9] / 1000.0, |
| 99 | + "ch2_sbu2_voltage": a[10] / 1000.0, |
| 100 | + "ch3_sbu1_voltage": a[11] / 1000.0, |
| 101 | + "ch3_sbu2_voltage": a[12] / 1000.0, |
| 102 | + "ch4_sbu1_voltage": a[13] / 1000.0, |
| 103 | + "ch4_sbu2_voltage": a[14] / 1000.0, |
| 104 | + "ch5_sbu1_voltage": a[15] / 1000.0, |
| 105 | + "ch5_sbu2_voltage": a[16] / 1000.0, |
| 106 | + "ch6_sbu1_voltage": a[17] / 1000.0, |
| 107 | + "ch6_sbu2_voltage": a[18] / 1000.0, |
| 108 | + } |
| 109 | + |
| 110 | + # ******************* control ******************* |
| 111 | + |
| 112 | + # Returns tuple with health packet version and CAN packet/USB packet version |
| 113 | + def get_packets_versions(self): |
| 114 | + dat = self._handle.controlRead(PandaJungle.REQUEST_IN, 0xdd, 0, 0, 3) |
| 115 | + if dat and len(dat) == 3: |
| 116 | + a = struct.unpack("BBB", dat) |
| 117 | + return (a[0], a[1], a[2]) |
| 118 | + return (-1, -1, -1) |
| 119 | + |
| 120 | + # ******************* jungle stuff ******************* |
| 121 | + |
| 122 | + def set_panda_power(self, enabled): |
| 123 | + self._handle.controlWrite(PandaJungle.REQUEST_OUT, 0xa0, int(enabled), 0, b'') |
| 124 | + |
| 125 | + def set_harness_orientation(self, mode): |
| 126 | + self._handle.controlWrite(PandaJungle.REQUEST_OUT, 0xa1, int(mode), 0, b'') |
| 127 | + |
| 128 | + def set_ignition(self, enabled): |
| 129 | + self._handle.controlWrite(PandaJungle.REQUEST_OUT, 0xa2, int(enabled), 0, b'') |
| 130 | + |
| 131 | + # ******************* serial ******************* |
| 132 | + |
| 133 | + def debug_read(self): |
| 134 | + ret = [] |
| 135 | + while 1: |
| 136 | + lret = bytes(self._handle.controlRead(PandaJungle.REQUEST_IN, 0xe0, 0, 0, 0x40)) |
| 137 | + if len(lret) == 0: |
| 138 | + break |
| 139 | + ret.append(lret) |
| 140 | + return b''.join(ret) |
0 commit comments