Skip to content

Commit

Permalink
Improve speed of TRCReader
Browse files Browse the repository at this point in the history
Rewrote some lines in the TRCReader to optimize for speed, mainly for TRC files v2.x. According to cProfile it's double as fast now.
  • Loading branch information
lebuni authored and Adrian Immer committed Nov 12, 2024
1 parent 9a766ce commit 6ed6578
Showing 1 changed file with 30 additions and 45 deletions.
75 changes: 30 additions & 45 deletions can/io/trc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import os
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any, Callable, Dict, Generator, List, Optional, TextIO, Union
from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, Union

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import (
TextIOMessageReader,
TextIOMessageWriter,
)
from ..util import channel2int, len2dlc
from .generic import TextIOMessageReader, TextIOMessageWriter

logger = logging.getLogger("can.io.trc")

Expand Down Expand Up @@ -58,13 +55,16 @@ def __init__(
"""
super().__init__(file, mode="r")
self.file_version = TRCFileVersion.UNKNOWN
self.start_time: Optional[datetime] = None
self.start_time: float = 0
self.columns: Dict[str, int] = {}
self._num_columns = -1

if not self.file:
raise ValueError("The given file cannot be None")

self._parse_cols: Callable[[List[str]], Optional[Message]] = lambda x: None
self._parse_cols: Callable[[Tuple[str, ...]], Optional[Message]] = (
lambda x: None
)

def _extract_header(self):
line = ""
Expand All @@ -89,16 +89,18 @@ def _extract_header(self):
elif line.startswith(";$STARTTIME"):
logger.debug("TRCReader: Found start time '%s'", line)
try:
self.start_time = datetime(
1899, 12, 30, tzinfo=timezone.utc
) + timedelta(days=float(line.split("=")[1]))
self.start_time = (
datetime(1899, 12, 30, tzinfo=timezone.utc)
+ timedelta(days=float(line.split("=")[1]))
).timestamp()
except IndexError:
logger.debug("TRCReader: Failed to parse start time")
elif line.startswith(";$COLUMNS"):
logger.debug("TRCReader: Found columns '%s'", line)
try:
columns = line.split("=")[1].split(",")
self.columns = {column: columns.index(column) for column in columns}
self._num_columns = len(columns) - 1
except IndexError:
logger.debug("TRCReader: Failed to parse columns")
elif line.startswith(";"):
Expand Down Expand Up @@ -132,7 +134,7 @@ def _extract_header(self):

return line

def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[2]
if arbit_id == "FFFFFFFF":
logger.info("TRCReader: Dropping bus info line")
Expand All @@ -147,16 +149,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)])
return msg

def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[3]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self.start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = 1
Expand All @@ -165,16 +162,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[2] == "Rx"
return msg

def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[4]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self.start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = int(cols[2])
Expand All @@ -183,7 +175,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[3] == "Rx"
return msg

def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
type_ = cols[self.columns["T"]]
bus = self.columns.get("B", None)

Expand All @@ -192,50 +184,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
dlc = len2dlc(length)
elif "L" in self.columns:
dlc = int(cols[self.columns["L"]])
length = dlc2len(dlc)
else:
raise ValueError("No length/dlc columns present.")

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[self.columns["O"]]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[self.columns["O"]]) / 1000 + self.start_time
msg.arbitration_id = int(cols[self.columns["I"]], 16)
msg.is_extended_id = len(cols[self.columns["I"]]) > 4
msg.channel = int(cols[bus]) if bus is not None else 1
msg.dlc = dlc
msg.data = bytearray(
[int(cols[i + self.columns["D"]], 16) for i in range(length)]
)
if dlc:
msg.data = bytearray.fromhex(cols[self.columns["D"]])
msg.is_rx = cols[self.columns["d"]] == "Rx"
msg.is_fd = type_ in ["FD", "FB", "FE", "BI"]
msg.bitrate_switch = type_ in ["FB", " FE"]
msg.error_state_indicator = type_ in ["FE", "BI"]
msg.is_fd = type_ in {"FD", "FB", "FE", "BI"}
msg.bitrate_switch = type_ in {"FB", "FE"}
msg.error_state_indicator = type_ in {"FE", "BI"}

return msg

def _parse_cols_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[2]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_1(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[3]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_3(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[self.columns["T"]]
if dtype in ["DT", "FD", "FB"]:
if dtype in {"DT", "FD", "FB", "FE", "BI"}:
return self._parse_msg_v2_x(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
Expand All @@ -244,7 +229,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_line(self, line: str) -> Optional[Message]:
logger.debug("TRCReader: Parse '%s'", line)
try:
cols = line.split()
cols = tuple(line.split(maxsplit=self._num_columns))
return self._parse_cols(cols)
except IndexError:
logger.warning("TRCReader: Failed to parse message '%s'", line)
Expand Down

0 comments on commit 6ed6578

Please sign in to comment.