diff --git a/isotp/address.py b/isotp/address.py index a20e205..abe3837 100755 --- a/isotp/address.py +++ b/isotp/address.py @@ -53,11 +53,17 @@ class Address: :param source_address: Source address (N_SA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode. :type source_address: int or None + :param physical_id: The CAN ID for physical (unicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. + :type: int or None + + :param functional_id: The CAN ID for functional (multicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. + :type: int or None + :param address_extension: Address extension (N_AE) used in ``Mixed_11bits``, ``Mixed_29bits`` addressing mode :type address_extension: int or None """ - def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rxid=None, target_address=None, source_address=None, address_extension=None, **kwargs): + def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rxid=None, target_address=None, source_address=None, physical_id=None, functional_id=None, address_extension=None, **kwargs): self.addressing_mode = addressing_mode self.target_address = target_address @@ -67,6 +73,14 @@ def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rx self.rxid = rxid self.is_29bits = True if self.addressing_mode in [ AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_29bits] else False + if self.addressing_mode == AddressingMode.NormalFixed_29bits: + self.physical_id = 0x18DA0000 if physical_id is None else physical_id & 0x1FFF0000 + self.functional_id = 0x18DB0000 if functional_id is None else functional_id & 0x1FFF0000 + + if self.addressing_mode == AddressingMode.Mixed_29bits: + self.physical_id = 0x18CE0000 if physical_id is None else physical_id & 0x1FFF0000 + self.functional_id = 0x18CD0000 if functional_id is None else functional_id & 0x1FFF0000 + self.validate() # From here, input is good. Do some precomputing for speed optimization without bothering about types or values @@ -86,12 +100,6 @@ def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rx self.tx_payload_prefix.extend(bytearray([self.address_extension])) self.rx_prefix_size = 1 - self.rxmask = None - if self.addressing_mode == AddressingMode.NormalFixed_29bits: - self.rxmask = 0x18DA0000 # This should ignore variant between Physical and Functional addressing - elif self.addressing_mode == AddressingMode.Mixed_29bits: - self.rxmask = 0x18CD0000 # This should ignore variant between Physical and Functional addressing - if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: self.is_for_me = self._is_for_me_normal elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: @@ -186,36 +194,30 @@ def _get_tx_arbitraton_id(self, address_type): return self.txid elif self.addressing_mode == AddressingMode.Normal_29bits: return self.txid - elif self.addressing_mode == AddressingMode.NormalFixed_29bits: - bits23_16 = 0xDA0000 if address_type==TargetAddressType.Physical else 0xDB0000 - return 0x18000000 | bits23_16 | (self.target_address << 8) | self.source_address elif self.addressing_mode == AddressingMode.Extended_11bits: return self.txid elif self.addressing_mode == AddressingMode.Extended_29bits: return self.txid elif self.addressing_mode == AddressingMode.Mixed_11bits: return self.txid - elif self.addressing_mode == AddressingMode.Mixed_29bits: - bits23_16 = 0xCE0000 if address_type==TargetAddressType.Physical else 0xCD0000 - return 0x18000000 | bits23_16 | (self.target_address << 8) | self.source_address + elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: + bits28_16 = self.physical_id if address_type==TargetAddressType.Physical else self.functional_id + return bits28_16 | (self.target_address << 8) | self.source_address def _get_rx_arbitration_id(self, address_type=TargetAddressType.Physical): if self.addressing_mode == AddressingMode.Normal_11bits: return self.rxid elif self.addressing_mode == AddressingMode.Normal_29bits: return self.rxid - elif self.addressing_mode == AddressingMode.NormalFixed_29bits: - bits23_16 = 0xDA0000 if address_type==TargetAddressType.Physical else 0xDB0000 - return 0x18000000 | bits23_16 | (self.source_address << 8) | self.target_address elif self.addressing_mode == AddressingMode.Extended_11bits: return self.rxid elif self.addressing_mode == AddressingMode.Extended_29bits: return self.rxid elif self.addressing_mode == AddressingMode.Mixed_11bits: return self.rxid - elif self.addressing_mode == AddressingMode.Mixed_29bits: - bits23_16 = 0xCE0000 if address_type==TargetAddressType.Physical else 0xCD0000 - return 0x18000000 | bits23_16 | (self.source_address << 8) | self.target_address + elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: + bits28_16 = self.physical_id if address_type==TargetAddressType.Physical else self.functional_id + return bits28_16 | (self.source_address << 8) | self.target_address def _is_for_me_normal(self, msg): if self.is_29bits == msg.is_extended_id: @@ -230,7 +232,7 @@ def _is_for_me_extended(self, msg): def _is_for_me_normalfixed(self, msg): if self.is_29bits == msg.is_extended_id: - return ((msg.arbitration_id >> 16) & 0xFF) in [218,219] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address + return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address return False def _is_for_me_mixed_11bits(self, msg): @@ -242,7 +244,7 @@ def _is_for_me_mixed_11bits(self, msg): def _is_for_me_mixed_29bits(self, msg): if self.is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: - return ((msg.arbitration_id >> 16) & 0xFF) in [205,206] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension + return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension return False def requires_extension_byte(self): diff --git a/isotp/address.pyi b/isotp/address.pyi index 002c862..9b339ab 100644 --- a/isotp/address.pyi +++ b/isotp/address.pyi @@ -31,7 +31,6 @@ class Address: rx_arbitration_id_functional: Optional[int] tx_payload_prefix: bytearray rx_prefix_size: int - rxmask: Optional[int] is_for_me: bool def __init__(self, addressing_mode: int=..., diff --git a/test/test_addressing_modes.py b/test/test_addressing_modes.py index 156ddf0..466cc60 100755 --- a/test/test_addressing_modes.py +++ b/test/test_addressing_modes.py @@ -257,6 +257,32 @@ def test_29bits_normal_fixed(self): self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical) self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional) + def test_29bits_normal_fixed_custom_id(self): + ta = 0x55 + sa = 0xAA + rxid_physical = 0x1F40AA55 + rxid_functional = 0x1F41AA55 + txid_physical = 0x1F4055AA + txid_functional = 0x1F4155AA + + p_id = 0x1F400000 + f_id = 0x1F410000 + + address = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address = ta, source_address=sa, physical_id=p_id, functional_id=f_id) + + self.assertTrue(address.is_for_me(Message(rxid_physical, extended_id=True))) + self.assertTrue(address.is_for_me(Message(rxid_functional, extended_id=True))) + self.assertFalse(address.is_for_me(Message(txid_physical, extended_id=True))) + self.assertFalse(address.is_for_me(Message(txid_functional, extended_id=True))) + self.assertFalse(address.is_for_me(Message(arbitration_id=(rxid_physical) & 0x7FF, extended_id=False))) + self.assertFalse(address.is_for_me(Message(arbitration_id=rxid_physical+1, extended_id=True))) + self.assertFalse(address.is_for_me(Message(arbitration_id=(rxid_physical+1)&0x7FF, extended_id=False))) + + self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Physical), txid_physical) + self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Functional), txid_functional) + self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical) + self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional) + def test_29bits_normal_fixed_through_layer(self): functional = isotp.TargetAddressType.Functional physical = isotp.TargetAddressType.Physical @@ -334,6 +360,86 @@ def test_29bits_normal_fixed_through_layer(self): self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B])) self.assertTrue(msg.is_extended_id) + def test_29bits_normal_fixed_custom_id_through_layer(self): + functional = isotp.TargetAddressType.Functional + physical = isotp.TargetAddressType.Physical + ta = 0x55 + sa = 0xAA + rxid_physical = 0x1F40AA55 + rxid_functional = 0x1F41AA55 + txid_physical = 0x1F4055AA + txid_functional = 0x1F4155AA + + p_id = 0x1F400000 + f_id = 0x1F410000 + + address = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address = ta, source_address=sa, physical_id=p_id, functional_id=f_id) + layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address, params={'stmin':0, 'blocksize':0}) + + # Receive Single frame - Physical + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x03, 0x01, 0x02, 0x03]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03') + + # Receive Single frame - Functional + layer.reset() + self.simulate_rx_msg(Message(arbitration_id = rxid_functional, data=bytearray([0x03, 0x01, 0x02, 0x03]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03') + + # Receive multiframe - Physical + layer.reset() + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x10, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06]), extended_id=True)) + layer.process() + self.assert_sent_flow_control(stmin=0, blocksize=0) + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x21, 0x07, 0x08]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03\x04\x05\x06\x07\x08') + + #Transmit single frame - Physical + layer.reset() + layer.send(b'\x04\x05\x06', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + #Transmit single frame - Functional + layer.reset() + layer.send(b'\x04\x05\x06', functional) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_functional) + self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + # Transmit multiframe - Physical + layer.reset() + layer.send(b'\x04\x05\x06\x07\x08\x09\x0A\x0B', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x10, 0x08, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09])) + self.assertTrue(msg.is_extended_id) + + self.simulate_rx_msg(Message(arbitration_id=rxid_physical, data=self.make_flow_control_data(flow_status=0, stmin=0, blocksize=0), extended_id=True)) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B])) + self.assertTrue(msg.is_extended_id) + def test_11bits_extended(self): txid = 0x123 rxid = 0x456 @@ -658,6 +764,37 @@ def test_29bits_mixed(self): self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical) self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional) + def test_29bits_mixed_custom_id(self): + ta = 0x55 + sa = 0xAA + ae = 0x99 + rxid_physical = 0x1F4EAA55 + rxid_functional = 0x1F4DAA55 + txid_physical = 0x1F4E55AA + txid_functional = 0x1F4D55AA + + p_id = 0x1F4E0000 + f_id = 0x1F4D0000 + + address = isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=sa, target_address=ta, address_extension = ae, physical_id=p_id, functional_id=f_id) + + self.assertFalse(address.is_for_me(Message(rxid_physical, extended_id=True))) # No data + self.assertFalse(address.is_for_me(Message(rxid_functional, extended_id=True))) # No data + self.assertFalse(address.is_for_me(Message(txid_physical, extended_id=True))) # No data + self.assertFalse(address.is_for_me(Message(txid_functional, extended_id=True))) # No data + + self.assertTrue(address.is_for_me(Message(rxid_physical, data = bytearray([ae]), extended_id=True))) + self.assertFalse(address.is_for_me(Message(rxid_physical, data = bytearray([ae]), extended_id=False))) + self.assertTrue(address.is_for_me(Message(rxid_functional, data = bytearray([ae]), extended_id=True))) + self.assertFalse(address.is_for_me(Message(rxid_functional, data = bytearray([ae]), extended_id=False))) + self.assertFalse(address.is_for_me(Message(txid_physical, data = bytearray([ae]), extended_id=True))) + self.assertFalse(address.is_for_me(Message(txid_functional, data = bytearray([ae]), extended_id=True))) + + self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Physical), txid_physical) + self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Functional), txid_functional) + self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical) + self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional) + def test_29bits_mixed_through_layer(self): functional = isotp.TargetAddressType.Functional physical = isotp.TargetAddressType.Physical @@ -735,3 +872,84 @@ def test_29bits_mixed_through_layer(self): self.assertEqual(msg.arbitration_id, txid_physical) self.assertEqual(msg.data, bytearray([ae, 0x21, 0x09, 0x0A, 0x0B])) self.assertTrue(msg.is_extended_id) + + def test_29bits_mixed_custom_id_through_layer(self): + functional = isotp.TargetAddressType.Functional + physical = isotp.TargetAddressType.Physical + ta = 0x55 + sa = 0xAA + ae = 0x99 + rxid_physical = 0x1F4EAA55 + rxid_functional = 0x1F4DAA55 + txid_physical = 0x1F4E55AA + txid_functional = 0x1F4D55AA + + p_id = 0x1F4E0000 + f_id = 0x1F4D0000 + + address = isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=sa, target_address=ta, address_extension = ae, physical_id=p_id, functional_id=f_id) + layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address, params={'stmin':0, 'blocksize':0}) + + # Receive Single frame - Physical + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x03, 0x01, 0x02, 0x03]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03') + + # Receive Single frame - Functional + layer.reset() + self.simulate_rx_msg(Message(arbitration_id = rxid_functional, data=bytearray([ae, 0x03, 0x01, 0x02, 0x03]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03') + + # Receive multiframe - Physical + layer.reset() + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x10, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05]), extended_id=True)) + layer.process() + self.assert_sent_flow_control(prefix=[ae], stmin=0, blocksize=0) + self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x21, 0x06, 0x07, 0x08]), extended_id=True)) + layer.process() + frame = layer.recv() + self.assertIsNotNone(frame) + self.assertEqual(frame, b'\x01\x02\x03\x04\x05\x06\x07\x08') + + #Transmit single frame - Physical + layer.reset() + layer.send(b'\x04\x05\x06', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([ae, 0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + #Transmit single frame - Functional + layer.reset() + layer.send(b'\x04\x05\x06', functional) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_functional) + self.assertEqual(msg.data, bytearray([ae, 0x03, 0x04, 0x05, 0x06])) + self.assertTrue(msg.is_extended_id) + + # Transmit multiframe - Physical + layer.reset() + layer.send(b'\x04\x05\x06\x07\x08\x09\x0A\x0B', physical) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([ae, 0x10, 0x08, 0x04, 0x05, 0x06, 0x07, 0x08])) + self.assertTrue(msg.is_extended_id) + + self.simulate_rx_msg(Message(arbitration_id=rxid_physical, data=self.make_flow_control_data(flow_status=0, stmin=0, blocksize=0, prefix=[ae]), extended_id=True)) + layer.process() + msg = self.get_tx_can_msg() + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, txid_physical) + self.assertEqual(msg.data, bytearray([ae, 0x21, 0x09, 0x0A, 0x0B])) + self.assertTrue(msg.is_extended_id)