Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WWH OBD DTC message handling #264

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/udsoncan/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ Methods by services
.. automethod:: udsoncan.client.Client.get_emission_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_mirrormemory_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_dtc_by_status_severity_mask
.. automethod:: udsoncan.client.Client.get_wwh_obd_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_number_of_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_mirrormemory_number_of_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_number_of_emission_dtc_by_status_mask
Expand Down
13 changes: 13 additions & 0 deletions doc/source/udsoncan/helper_classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ DTC.Format

-----

.. _DTC_FunctionalGroupIdentifiers:

DTC.FunctionalGroupIdentifiers
------------------------------

.. autoclass:: udsoncan::Dtc.FunctionalGroupIdentifiers
:members:
:undoc-members:
:member-order: bysource
:exclude-members: get_name

-----

.. _IOValues:

IOValues
Expand Down
97 changes: 97 additions & 0 deletions test/client/test_read_dtc_information.py
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,103 @@ def __init__(self, *args, **kwargs):
GenericTestNoParamRequest_DtcAndStatusMaskResponse.__init__(self, subfunction=0x15, client_function = 'get_dtc_with_permanent_status')


class TestreportDTCWWHOBDDTCByMaskRecord(ClientServerTest): # Subfn = 0x16
sb = struct.pack('B', 0x42)
badsb = struct.pack('B', 0x42+1)
functional_group_id = 0x1
status_mask = 0x2
severity_mask = 0x20
dtc_class = 0x4
expected_request_bytes = b'\x19' + sb + bytes([functional_group_id, status_mask, severity_mask | dtc_class])

def assert_no_data_response(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)

def test_no_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4]))

def _test_no_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
self.assert_no_data_response(response)

def test_functional_group_verification(self):
pass

def _test_functional_group_verification(self):
with self.assertRaises(ValueError):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(None, self.status_mask, self.severity_mask, self.dtc_class)

with self.assertRaises(ValueError):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(0xff, self.status_mask, self.severity_mask, self.dtc_class)

def assert_no_data_response_with_severity_class(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)

def test_no_data_with_severity_class(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4]))

def _test_no_data_with_severity_class(self):
severity_class = Dtc.Severity()
severity_class.set_byte(self.severity_mask)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, severity_class, self.dtc_class)
self.assert_no_data_response(response)

def assert_single_data_response(self, response):
self.assertEqual(len(response.service_data.dtcs), 1)
self.assertEqual(response.service_data.dtc_count, 1)

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)

self.assertEqual(len(dtc.extended_data), 0)

def test_single_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4, 0x33, 0x12, 0x34, 0x56, 0x20]))

def _test_single_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
self.assert_single_data_response(response)

def assert_multiple_data_response(self, response):
self.assertEqual(len(response.service_data.dtcs), 2)
self.assertEqual(response.service_data.dtc_count, 2)

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)
self.assertEqual(len(dtc.extended_data), 0)

dtc = response.service_data.dtcs[1]

self.assertEqual(dtc.id, 0x123457)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)
self.assertEqual(len(dtc.extended_data), 0)


def test_multiple_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4])
+ bytes([0x33, 0x12, 0x34, 0x56, 0x20])
+ bytes([0x33, 0x12, 0x34, 0x57, 0x20]))

def _test_multiple_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
self.assert_multiple_data_response(response)


class TestreportDTCExtDataRecordByRecordNumber(ClientServerTest): # Subfn = 0x16
Expand Down
39 changes: 36 additions & 3 deletions udsoncan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,38 @@ def get_dtc_by_status_severity_mask(self, status_mask: int, severity_mask: int)
"""
return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportDTCBySeverityMaskRecord, status_mask=status_mask, severity_mask=severity_mask)

def get_wwh_obd_dtc_by_status_mask(self, functional_group_id: int, status_mask: int, severity_mask: Union[int,Dtc.Severity], dtc_class: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]:
"""
Performs a ``ReadDTCInformation`` service request with subfunction ``reportWWHOBDDTCByMaskRecord``

Reads all the WWH OBD Diagnostic Trouble Codes that have a functional_group, class, status and a severity matching the given masks.
The server will check all of its DTCs and if ( (Dtc.status & status_mask) != 0 && (Dtc.severity & severity) !=0), then the DTCs match the filter and are sent back to the client.
Note: severity_mask and dtc_class are combined into a single byte to populate DTCSeverityMask- see Table D.11.

:Effective configuration: ``exception_on_<type>_response`` ``tolerate_zero_padding`` ``ignore_all_zero_dtc``

:param functional_group_id: Functional Group ID to search for (FGID) (0x00 to 0xFF) :ref:`Dtc.FunctionalGroupIdentifiers<DTC_FunctionalGroupIdentifiers>`
:type functional_group_id: int

:param status_mask: The status mask against which the DTCs are tested.
:type status_mask: int or :ref:`Dtc.Status<DTC_Status>`

:param severity_mask: The severity mask against which the DTCs are tested. (Optionas 0x20, 0x40, or 0x80)
:type severity_mask: int or :ref:`Dtc.Severity<DTC_Severity>`

:param dtc_class: The GTR DTC class mask against which the DTCs are tested. (Options 0x01, 0x02, 0x04, 0x08)
:type dtc_class: int

:return: The server response parsed by :meth:`ReadDTCInformation.interpret_response<udsoncan.services.ReadDTCInformation.interpret_response>`
:rtype: :ref:`Response<Response>`
"""
dtc_severity_mask = dtc_class & 0x1f
if isinstance(severity_mask, Dtc.Severity):
dtc_severity_mask |= (severity_mask.get_byte_as_int() & 0xe0)
else:
dtc_severity_mask |= (severity_mask & 0xe0)
return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord, status_mask=status_mask, severity_mask=dtc_severity_mask, functional_group_id=functional_group_id)

def get_number_of_dtc_by_status_mask(self, status_mask: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]:
"""
Performs a ``ReadDTCInformation`` service request with subfunction ``reportNumberOfDTCByStatusMask``
Expand Down Expand Up @@ -1733,8 +1765,8 @@ def read_dtc_information(self,
snapshot_record_number: Optional[int] = None,
extended_data_record_number: Optional[int] = None,
extended_data_size: Optional[int] = None,
memory_selection: Optional[int] = None
):
memory_selection: Optional[int] = None,
functional_group_id: Optional[int] = None):
if dtc is not None and isinstance(dtc, Dtc):
dtc = dtc.id

Expand All @@ -1745,7 +1777,8 @@ def read_dtc_information(self,
snapshot_record_number=snapshot_record_number,
extended_data_record_number=extended_data_record_number,
memory_selection=memory_selection,
standard_version=self.config['standard_version'])
standard_version=self.config['standard_version'],
functional_group_id=functional_group_id)

self.logger.info('%s - Sending request with subfunction "%s" (0x%02X).' % (self.service_log_prefix(services.ReadDTCInformation),
services.ReadDTCInformation.Subfunction.get_name(subfunction), subfunction))
Expand Down
8 changes: 8 additions & 0 deletions udsoncan/common/dtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ def get_name(cls, given_id: Optional[int]) -> Optional[str]:

return None

class FunctionalGroupIdentifiers:
"""
Provides a list of FunctionalGroupIdentifiers (Table D.15) which are used by the :ref:`ReadDTCInformation<ReadDtcInformation>` when requesting a number of DTCs.
"""
EMISSIONS_SYSTEM_GROUP = 0x33
SAFETY_SYSTEM_GROUP = 0xD0
VOBD_SYSTEM = 0xFE

# DTC Status byte
# This byte is an 8-bit flag indicating how much we are sure that a DTC is active.
class Status:
Expand Down
43 changes: 41 additions & 2 deletions udsoncan/services/ReadDTCInformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class Subfunction(BaseSubfunction):
reportUserDefMemoryDTCByStatusMask = 0x17
reportUserDefMemoryDTCSnapshotRecordByDTCNumber = 0x18
reportUserDefMemoryDTCExtDataRecordByDTCNumber = 0x19
reportWWHOBDDTCByMaskRecord = 0x42

# todo
reportSupportedDTCExtDataRecord = 0x1A
reportWWHOBDDTCByMaskRecord = 0x42
reportWWHOBDDTCWithPermanentStatus = 0x55
reportDTCInformationByDTCReadinessGroupIdentifier = 0x56

Expand Down Expand Up @@ -161,6 +161,12 @@ def assert_extended_data_size_int_or_dict(cls,
for dtcid in extended_data_size:
tools.validate_int(extended_data_size[dtcid], min=0, max=0xFFF, name='Extended data size for DTC=0x%06x' % dtcid)

@classmethod
def assert_functional_group_id(cls, functional_group_id: Optional[int], subfunction, maxval: int = 0xFF) -> None:
if functional_group_id is None:
raise ValueError('functional_group_id must be provided for subfunction 0x%02x' % subfunction)
tools.validate_int(functional_group_id, min=0, max=maxval, name='Functional Group ID')

@classmethod
def pack_dtc(cls, dtcid: int) -> bytes:
return struct.pack('BBB', (dtcid >> 16) & 0xFF, (dtcid >> 8) & 0xFF, (dtcid >> 0) & 0xFF)
Expand Down Expand Up @@ -202,7 +208,9 @@ def make_request(cls,
snapshot_record_number: Optional[int] = None,
extended_data_record_number: Optional[int] = None,
memory_selection: Optional[int] = None,
standard_version: int = latest_standard) -> Request:
standard_version: int = latest_standard,
functional_group_id:Optional[int] = None,
) -> Request:
"""
Generates a request for ReadDTCInformation.
Each subfunction uses a subset of parameters.
Expand Down Expand Up @@ -361,6 +369,12 @@ def make_request(cls,
cls.assert_extended_data_record_number(extended_data_record_number, subfunction, maxval=0xEF) # Maximum specified by ISO-14229:2020
req.data = struct.pack('B', extended_data_record_number)

elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord:
cls.assert_status_mask(status_mask, subfunction)
cls.assert_severity_mask(severity_mask, subfunction)
cls.assert_functional_group_id(functional_group_id, subfunction, maxval=0xFE) # Maximum specified by ISO-14229:2020
req.data = struct.pack('BBB', functional_group_id, status_mask, severity_mask)

return req

@classmethod
Expand Down Expand Up @@ -872,6 +886,31 @@ def interpret_response(cls,

response.service_data.dtc_count = len(response.service_data.dtcs)

elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord:
if len(response.data) < 4:
raise InvalidResponseException(response, 'Incomplete response from server.')

_functional_group_id = response.data[1]
_dtc_status_availability_mask = response.data[2]
_dtc_severity_availability_mask = response.data[3]
_dtc_format_identifier = response.data[4]

remaining_bytes = response.data[5:]

if len(remaining_bytes) % 5 != 0:
raise InvalidResponseException(response, 'Incomplete response from server. Remaining bytes must be a multiple of 5')

while remaining_bytes:
severity = remaining_bytes[0]
dtc = Dtc(struct.unpack('>L', b'\x00' + remaining_bytes[1:4])[0])
status_of_dtc = Dtc.Status.from_byte(remaining_bytes[4])
dtc.severity.set_byte(severity)
dtc.status = status_of_dtc
remaining_bytes = remaining_bytes[5:]
response.service_data.dtcs.append(dtc)

response.service_data.dtc_count = len(response.service_data.dtcs)

return cast(ReadDTCInformation.InterpretedResponse, response)

@classmethod
Expand Down