Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom CAN ID #77

Merged
merged 1 commit into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions isotp/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@ class Address:
:param source_address: Source address (N_SA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode.
:type source_address: int or None

:param physical_id: The CAN ID for physical (unicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None.
:type: int or None

:param functional_id: The CAN ID for functional (multicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None.
:type: int or None

:param address_extension: Address extension (N_AE) used in ``Mixed_11bits``, ``Mixed_29bits`` addressing mode
:type address_extension: int or None
"""

def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rxid=None, target_address=None, source_address=None, address_extension=None, **kwargs):
def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rxid=None, target_address=None, source_address=None, physical_id=None, functional_id=None, address_extension=None, **kwargs):

self.addressing_mode = addressing_mode
self.target_address = target_address
Expand All @@ -67,6 +73,14 @@ def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rx
self.rxid = rxid
self.is_29bits = True if self.addressing_mode in [ AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_29bits] else False

if self.addressing_mode == AddressingMode.NormalFixed_29bits:
self.physical_id = 0x18DA0000 if physical_id is None else physical_id & 0x1FFF0000
self.functional_id = 0x18DB0000 if functional_id is None else functional_id & 0x1FFF0000

if self.addressing_mode == AddressingMode.Mixed_29bits:
self.physical_id = 0x18CE0000 if physical_id is None else physical_id & 0x1FFF0000
self.functional_id = 0x18CD0000 if functional_id is None else functional_id & 0x1FFF0000

self.validate()

# From here, input is good. Do some precomputing for speed optimization without bothering about types or values
Expand All @@ -86,12 +100,6 @@ def __init__(self, addressing_mode = AddressingMode.Normal_11bits, txid=None, rx
self.tx_payload_prefix.extend(bytearray([self.address_extension]))
self.rx_prefix_size = 1

self.rxmask = None
if self.addressing_mode == AddressingMode.NormalFixed_29bits:
self.rxmask = 0x18DA0000 # This should ignore variant between Physical and Functional addressing
elif self.addressing_mode == AddressingMode.Mixed_29bits:
self.rxmask = 0x18CD0000 # This should ignore variant between Physical and Functional addressing

if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]:
self.is_for_me = self._is_for_me_normal
elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]:
Expand Down Expand Up @@ -186,36 +194,30 @@ def _get_tx_arbitraton_id(self, address_type):
return self.txid
elif self.addressing_mode == AddressingMode.Normal_29bits:
return self.txid
elif self.addressing_mode == AddressingMode.NormalFixed_29bits:
bits23_16 = 0xDA0000 if address_type==TargetAddressType.Physical else 0xDB0000
return 0x18000000 | bits23_16 | (self.target_address << 8) | self.source_address
elif self.addressing_mode == AddressingMode.Extended_11bits:
return self.txid
elif self.addressing_mode == AddressingMode.Extended_29bits:
return self.txid
elif self.addressing_mode == AddressingMode.Mixed_11bits:
return self.txid
elif self.addressing_mode == AddressingMode.Mixed_29bits:
bits23_16 = 0xCE0000 if address_type==TargetAddressType.Physical else 0xCD0000
return 0x18000000 | bits23_16 | (self.target_address << 8) | self.source_address
elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]:
bits28_16 = self.physical_id if address_type==TargetAddressType.Physical else self.functional_id
return bits28_16 | (self.target_address << 8) | self.source_address

def _get_rx_arbitration_id(self, address_type=TargetAddressType.Physical):
if self.addressing_mode == AddressingMode.Normal_11bits:
return self.rxid
elif self.addressing_mode == AddressingMode.Normal_29bits:
return self.rxid
elif self.addressing_mode == AddressingMode.NormalFixed_29bits:
bits23_16 = 0xDA0000 if address_type==TargetAddressType.Physical else 0xDB0000
return 0x18000000 | bits23_16 | (self.source_address << 8) | self.target_address
elif self.addressing_mode == AddressingMode.Extended_11bits:
return self.rxid
elif self.addressing_mode == AddressingMode.Extended_29bits:
return self.rxid
elif self.addressing_mode == AddressingMode.Mixed_11bits:
return self.rxid
elif self.addressing_mode == AddressingMode.Mixed_29bits:
bits23_16 = 0xCE0000 if address_type==TargetAddressType.Physical else 0xCD0000
return 0x18000000 | bits23_16 | (self.source_address << 8) | self.target_address
elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]:
bits28_16 = self.physical_id if address_type==TargetAddressType.Physical else self.functional_id
return bits28_16 | (self.source_address << 8) | self.target_address

def _is_for_me_normal(self, msg):
if self.is_29bits == msg.is_extended_id:
Expand All @@ -230,7 +232,7 @@ def _is_for_me_extended(self, msg):

def _is_for_me_normalfixed(self, msg):
if self.is_29bits == msg.is_extended_id:
return ((msg.arbitration_id >> 16) & 0xFF) in [218,219] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address
return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address
return False

def _is_for_me_mixed_11bits(self, msg):
Expand All @@ -242,7 +244,7 @@ def _is_for_me_mixed_11bits(self, msg):
def _is_for_me_mixed_29bits(self, msg):
if self.is_29bits == msg.is_extended_id:
if msg.data is not None and len(msg.data) > 0:
return ((msg.arbitration_id >> 16) & 0xFF) in [205,206] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension
return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension
return False

def requires_extension_byte(self):
Expand Down
1 change: 0 additions & 1 deletion isotp/address.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class Address:
rx_arbitration_id_functional: Optional[int]
tx_payload_prefix: bytearray
rx_prefix_size: int
rxmask: Optional[int]
is_for_me: bool
def __init__(self,
addressing_mode: int=...,
Expand Down
218 changes: 218 additions & 0 deletions test/test_addressing_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@ def test_29bits_normal_fixed(self):
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional)

def test_29bits_normal_fixed_custom_id(self):
ta = 0x55
sa = 0xAA
rxid_physical = 0x1F40AA55
rxid_functional = 0x1F41AA55
txid_physical = 0x1F4055AA
txid_functional = 0x1F4155AA

p_id = 0x1F400000
f_id = 0x1F410000

address = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address = ta, source_address=sa, physical_id=p_id, functional_id=f_id)

self.assertTrue(address.is_for_me(Message(rxid_physical, extended_id=True)))
self.assertTrue(address.is_for_me(Message(rxid_functional, extended_id=True)))
self.assertFalse(address.is_for_me(Message(txid_physical, extended_id=True)))
self.assertFalse(address.is_for_me(Message(txid_functional, extended_id=True)))
self.assertFalse(address.is_for_me(Message(arbitration_id=(rxid_physical) & 0x7FF, extended_id=False)))
self.assertFalse(address.is_for_me(Message(arbitration_id=rxid_physical+1, extended_id=True)))
self.assertFalse(address.is_for_me(Message(arbitration_id=(rxid_physical+1)&0x7FF, extended_id=False)))

self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Physical), txid_physical)
self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Functional), txid_functional)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional)

def test_29bits_normal_fixed_through_layer(self):
functional = isotp.TargetAddressType.Functional
physical = isotp.TargetAddressType.Physical
Expand Down Expand Up @@ -334,6 +360,86 @@ def test_29bits_normal_fixed_through_layer(self):
self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B]))
self.assertTrue(msg.is_extended_id)

def test_29bits_normal_fixed_custom_id_through_layer(self):
functional = isotp.TargetAddressType.Functional
physical = isotp.TargetAddressType.Physical
ta = 0x55
sa = 0xAA
rxid_physical = 0x1F40AA55
rxid_functional = 0x1F41AA55
txid_physical = 0x1F4055AA
txid_functional = 0x1F4155AA

p_id = 0x1F400000
f_id = 0x1F410000

address = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address = ta, source_address=sa, physical_id=p_id, functional_id=f_id)
layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address, params={'stmin':0, 'blocksize':0})

# Receive Single frame - Physical
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x03, 0x01, 0x02, 0x03]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03')

# Receive Single frame - Functional
layer.reset()
self.simulate_rx_msg(Message(arbitration_id = rxid_functional, data=bytearray([0x03, 0x01, 0x02, 0x03]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03')

# Receive multiframe - Physical
layer.reset()
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x10, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06]), extended_id=True))
layer.process()
self.assert_sent_flow_control(stmin=0, blocksize=0)
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([0x21, 0x07, 0x08]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03\x04\x05\x06\x07\x08')

#Transmit single frame - Physical
layer.reset()
layer.send(b'\x04\x05\x06', physical)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06]))
self.assertTrue(msg.is_extended_id)

#Transmit single frame - Functional
layer.reset()
layer.send(b'\x04\x05\x06', functional)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_functional)
self.assertEqual(msg.data, bytearray([0x03, 0x04, 0x05, 0x06]))
self.assertTrue(msg.is_extended_id)

# Transmit multiframe - Physical
layer.reset()
layer.send(b'\x04\x05\x06\x07\x08\x09\x0A\x0B', physical)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([0x10, 0x08, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09]))
self.assertTrue(msg.is_extended_id)

self.simulate_rx_msg(Message(arbitration_id=rxid_physical, data=self.make_flow_control_data(flow_status=0, stmin=0, blocksize=0), extended_id=True))
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([0x21, 0x0A, 0x0B]))
self.assertTrue(msg.is_extended_id)

def test_11bits_extended(self):
txid = 0x123
rxid = 0x456
Expand Down Expand Up @@ -658,6 +764,37 @@ def test_29bits_mixed(self):
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional)

def test_29bits_mixed_custom_id(self):
ta = 0x55
sa = 0xAA
ae = 0x99
rxid_physical = 0x1F4EAA55
rxid_functional = 0x1F4DAA55
txid_physical = 0x1F4E55AA
txid_functional = 0x1F4D55AA

p_id = 0x1F4E0000
f_id = 0x1F4D0000

address = isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=sa, target_address=ta, address_extension = ae, physical_id=p_id, functional_id=f_id)

self.assertFalse(address.is_for_me(Message(rxid_physical, extended_id=True))) # No data
self.assertFalse(address.is_for_me(Message(rxid_functional, extended_id=True))) # No data
self.assertFalse(address.is_for_me(Message(txid_physical, extended_id=True))) # No data
self.assertFalse(address.is_for_me(Message(txid_functional, extended_id=True))) # No data

self.assertTrue(address.is_for_me(Message(rxid_physical, data = bytearray([ae]), extended_id=True)))
self.assertFalse(address.is_for_me(Message(rxid_physical, data = bytearray([ae]), extended_id=False)))
self.assertTrue(address.is_for_me(Message(rxid_functional, data = bytearray([ae]), extended_id=True)))
self.assertFalse(address.is_for_me(Message(rxid_functional, data = bytearray([ae]), extended_id=False)))
self.assertFalse(address.is_for_me(Message(txid_physical, data = bytearray([ae]), extended_id=True)))
self.assertFalse(address.is_for_me(Message(txid_functional, data = bytearray([ae]), extended_id=True)))

self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Physical), txid_physical)
self.assertEqual(address.get_tx_arbitraton_id(isotp.TargetAddressType.Functional), txid_functional)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Physical), rxid_physical)
self.assertEqual(address.get_rx_arbitraton_id(isotp.TargetAddressType.Functional), rxid_functional)

def test_29bits_mixed_through_layer(self):
functional = isotp.TargetAddressType.Functional
physical = isotp.TargetAddressType.Physical
Expand Down Expand Up @@ -735,3 +872,84 @@ def test_29bits_mixed_through_layer(self):
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([ae, 0x21, 0x09, 0x0A, 0x0B]))
self.assertTrue(msg.is_extended_id)

def test_29bits_mixed_custom_id_through_layer(self):
functional = isotp.TargetAddressType.Functional
physical = isotp.TargetAddressType.Physical
ta = 0x55
sa = 0xAA
ae = 0x99
rxid_physical = 0x1F4EAA55
rxid_functional = 0x1F4DAA55
txid_physical = 0x1F4E55AA
txid_functional = 0x1F4D55AA

p_id = 0x1F4E0000
f_id = 0x1F4D0000

address = isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=sa, target_address=ta, address_extension = ae, physical_id=p_id, functional_id=f_id)
layer = isotp.TransportLayer(txfn=self.stack_txfn, rxfn=self.stack_rxfn, address=address, params={'stmin':0, 'blocksize':0})

# Receive Single frame - Physical
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x03, 0x01, 0x02, 0x03]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03')

# Receive Single frame - Functional
layer.reset()
self.simulate_rx_msg(Message(arbitration_id = rxid_functional, data=bytearray([ae, 0x03, 0x01, 0x02, 0x03]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03')

# Receive multiframe - Physical
layer.reset()
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x10, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05]), extended_id=True))
layer.process()
self.assert_sent_flow_control(prefix=[ae], stmin=0, blocksize=0)
self.simulate_rx_msg(Message(arbitration_id = rxid_physical, data=bytearray([ae, 0x21, 0x06, 0x07, 0x08]), extended_id=True))
layer.process()
frame = layer.recv()
self.assertIsNotNone(frame)
self.assertEqual(frame, b'\x01\x02\x03\x04\x05\x06\x07\x08')

#Transmit single frame - Physical
layer.reset()
layer.send(b'\x04\x05\x06', physical)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([ae, 0x03, 0x04, 0x05, 0x06]))
self.assertTrue(msg.is_extended_id)

#Transmit single frame - Functional
layer.reset()
layer.send(b'\x04\x05\x06', functional)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_functional)
self.assertEqual(msg.data, bytearray([ae, 0x03, 0x04, 0x05, 0x06]))
self.assertTrue(msg.is_extended_id)

# Transmit multiframe - Physical
layer.reset()
layer.send(b'\x04\x05\x06\x07\x08\x09\x0A\x0B', physical)
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([ae, 0x10, 0x08, 0x04, 0x05, 0x06, 0x07, 0x08]))
self.assertTrue(msg.is_extended_id)

self.simulate_rx_msg(Message(arbitration_id=rxid_physical, data=self.make_flow_control_data(flow_status=0, stmin=0, blocksize=0, prefix=[ae]), extended_id=True))
layer.process()
msg = self.get_tx_can_msg()
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, txid_physical)
self.assertEqual(msg.data, bytearray([ae, 0x21, 0x09, 0x0A, 0x0B]))
self.assertTrue(msg.is_extended_id)