Skip to content

Commit

Permalink
apply ament_mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
kei1107 committed Feb 22, 2025
1 parent d6f5370 commit 530a72b
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 16 deletions.
2 changes: 1 addition & 1 deletion launch/system_monitor.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.launch_description import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
Expand Down
23 changes: 15 additions & 8 deletions ros2_system_monitor/cpu_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ def check_core_temps(self):
Use 'sensors' to find cores
Read from every core
"""
diag_vals: list[KeyValue] = []
diag_msgs: list[str] = []
# list[KeyValue]
diag_vals = []
# list[str]
diag_msgs = []
diag_level = DiagnosticStatus.OK

p = subprocess.Popen(
Expand Down Expand Up @@ -194,8 +196,9 @@ def check_core_temps(self):

def check_clock_speed(self):
"""Check clock speed from reading from CPU info."""
vals: list[KeyValue] = []
msgs: list[str] = []
# list[KeyValue]
vals = []
msgs = []
lvl = DiagnosticStatus.OK

try:
Expand Down Expand Up @@ -242,7 +245,8 @@ def check_clock_speed(self):
def check_uptime(self):
"""Use 'uptime' to see load average."""
level = DiagnosticStatus.OK
vals: list[KeyValue] = []
# list[KeyValue]
vals = []

try:
p = subprocess.Popen(
Expand Down Expand Up @@ -290,7 +294,8 @@ def check_uptime(self):

def check_mpstat(self):
"""Use mpstat to find CPU usage."""
vals: list[KeyValue] = []
# list[KeyValue]
vals = []
mp_level = DiagnosticStatus.OK

try:
Expand Down Expand Up @@ -407,7 +412,8 @@ def check_temps(self):
KeyValue(key="Update Status", value="OK"),
KeyValue(key="Time Since Last Update", value=str(Time(seconds=0.0))),
]
diag_msgs: list[str] = []
# list[str]
diag_msgs = []
diag_level = DiagnosticStatus.OK

if self._check_core_temps:
Expand Down Expand Up @@ -444,7 +450,8 @@ def check_usage(self):
KeyValue(key="Update Status", value="OK"),
KeyValue(key="Time Since Last Update", value=str(Time(seconds=0.0))),
]
diag_msgs: list[str] = []
# list[str]
diag_msgs = []
diag_level = DiagnosticStatus.OK

# Check clock speed
Expand Down
5 changes: 3 additions & 2 deletions ros2_system_monitor/hdd_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ def check_temps(self):
KeyValue(key="Update Status", value="OK"),
KeyValue(key="Time Since Last Update", value=str(Time(seconds=0.0))),
]
diag_msgs: list[str] = []
# list[str]
diag_msgs = []
diag_level = DiagnosticStatus.OK

sensors_ok = True
Expand All @@ -185,7 +186,7 @@ def check_temps(self):
diag_msgs.append("Unable to Check HW Temp")
sensors_ok = False
else:
sensors_data: dict = json.loads(stdout)
sensors_data = json.loads(stdout)
hw_paths, temps = self.get_temp_input(sensors_data)
assert len(hw_paths) == len(temps)
for index in range(0, len(hw_paths)):
Expand Down
6 changes: 4 additions & 2 deletions ros2_system_monitor/mem_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def cancel_timers(self):
self._usage_timer.cancel()

def check_memory(self):
values: list[KeyValue] = []
# list[KeyValue]
values = []
level = DiagnosticStatus.OK
msg = ""

Expand Down Expand Up @@ -165,7 +166,8 @@ def check_usage(self):
KeyValue(key="Update Status", value="OK"),
KeyValue(key="Time Since Last Update", value=str(Time(seconds=0.0))),
]
diag_msgs: list[str] = []
# list[str]
diag_msgs = []

# Check memory
mem_level, mem_msg, mem_vals = self.check_memory()
Expand Down
6 changes: 4 additions & 2 deletions ros2_system_monitor/net_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def cancel_timers(self):
self._usage_timer.cancel()

def check_network(self):
values: list[KeyValue] = []
# list[KeyValue]
values = []

try:
p = subprocess.Popen(
Expand Down Expand Up @@ -209,7 +210,8 @@ def check_usage(self):
KeyValue(key="Update Status", value="OK"),
KeyValue(key="Time Since Last Update", value=str(Time(seconds=0.0))),
]
diag_msgs: list[str] = []
# list[str]
diag_msgs = []

# Check network
net_level, net_msg, net_vals = self.check_network()
Expand Down
3 changes: 2 additions & 1 deletion ros2_system_monitor/ntp_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def __init__(self, hostname, diag_hostname):
self._ntp_stat = DiagnosticStatus()
self._ntp_stat.level = DiagnosticStatus.OK
self._ntp_stat.message = "OK"
self._ntp_stat.values: list[KeyValue] = []
# list[KeyValue]
self._ntp_stat.values = []
self.updater.add(
f"NTP offset from {diag_hostname} to {self._ntp_hostname}",
self.update_ntp_stat,
Expand Down

0 comments on commit 530a72b

Please sign in to comment.