Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean exit on Ctrl+C #14

Merged
merged 9 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbrakes/data_handling/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import csv
import multiprocessing
import signal
from pathlib import Path

from airbrakes.data_handling.imu_data_packet import IMUDataPacket
Expand Down Expand Up @@ -85,6 +86,8 @@ def _logging_loop(self) -> None:
"""
The loop that saves data to the logs. It runs in parallel with the main loop.
"""
# Ignore the SIGINT (Ctrl+C) signal, because we only want the main process to handle it
signal.signal(signal.SIGINT, signal.SIG_IGN) # Ignores the interrupt signal
# Set up the csv logging in the new process
with self.log_path.open(mode="a", newline="") as file_writer:
writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS)
Expand Down
7 changes: 5 additions & 2 deletions airbrakes/hardware/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import collections
import multiprocessing
import signal
import warnings

try:
import mscl
except ImportError:
warnings.warn(
"Could not import MSCL, IMU will not work. Please see installation instructions"
"Could not import MSCL, IMU will not work. Please see installation instructions "
"here: https://github.com/LORD-MicroStrain/MSCL/tree/master",
stacklevel=2,
)
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(self, port: str, frequency: int, upside_down: bool):

# Starts the process that fetches data from the IMU
self._data_fetch_process = multiprocessing.Process(
target=self._fetch_data_loop, args=(port, frequency, upside_down)
target=self._fetch_data_loop, args=(port, frequency, upside_down), name="IMU Data Fetch Process"
)

@property
Expand All @@ -64,6 +65,8 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool):
"""
This is the loop that fetches data from the IMU. It runs in parallel with the main loop.
"""
# Ignore the SIGINT (Ctrl+C) signal, because we only want the main process to handle it
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Connect to the IMU
connection = mscl.Connection.Serial(port)
node = mscl.InertialNode(connection)
Expand Down
19 changes: 9 additions & 10 deletions main.py
JacksonElia marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ def main():

# The context that will manage the airbrakes state machine
airbrakes = AirbrakesContext(logger, servo, imu)

# Start the IMU and logger processes:
airbrakes.start()

# This is the main loop that will run until the stop method on the airbrakes is called
while not airbrakes.shutdown_requested:
airbrakes.update()

# Shutdown the IMU and logger processes:
airbrakes.stop()
try:
airbrakes.start() # Start the IMU and logger processes
# This is the main loop that will run until we press Ctrl+C
while not airbrakes.shutdown_requested:
airbrakes.update()
except KeyboardInterrupt:
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
pass
finally:
airbrakes.stop() # Stop the IMU and logger processes


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ select = ["E", "F", "I", "PL", "UP", "RUF", "PTH", "C4", "B", "PIE", "SIM", "RET

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["T20", "S101", "D100", "ARG001"]


[tool.pytest.ini_options]
filterwarnings = "ignore:This process:DeprecationWarning" # ignore warning about fork()
70 changes: 10 additions & 60 deletions scripts/run_imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,18 @@
from constants import FREQUENCY, PORT, UPSIDE_DOWN, TEST_LOGS_PATH
from airbrakes.hardware.imu import IMU
from airbrakes.data_handling.logger import Logger
from pathlib import Path

# import matplotlib.pyplot as plt
# import matplotlib.animation as animation
# from collections import deque
# import numpy as np

# # Initialize a deque for each axis to store the last N acceleration values
# N = 100 # Number of points to display
# x_vals = deque(maxlen=N)
# y_vals = deque(maxlen=N)
# z_vals = deque(maxlen=N)
# time_vals = deque(maxlen=N)

# # Initialize the plot
# fig, ax = plt.subplots()
# ax.set_xlim(0, N)
# ax.set_ylim(-2, 2) # Assuming accelerations range from -2 to 2 g

# x_line, = ax.plot([], [], label='X-axis')
# y_line, = ax.plot([], [], label='Y-axis')
# z_line, = ax.plot([], [], label='Z-axis')

# ax.legend()

# def init():
# x_line.set_data([], [])
# y_line.set_data([], [])
# z_line.set_data([], [])
# return x_line, y_line, z_line

# def update(frame):
# # Simulate reading IMU data in real-time
# a = imu.get_imu_data_packet()
# if not isinstance(a, EstimatedDataPacket):
# return x_line, y_line, z_line

# # Append new data to the deque
# time_vals.append(len(time_vals))
# x_vals.append(a.estCompensatedAccelX)
# y_vals.append(a.estCompensatedAccelY)
# z_vals.append(a.estCompensatedAccelZ)

# # Update the plot data
# x_line.set_data(time_vals, x_vals)
# y_line.set_data(time_vals, y_vals)
# z_line.set_data(time_vals, z_vals)

# return x_line, y_line, z_line

# ani = animation.FuncAnimation(fig, update, init_func=init, blit=True, interval=50)

# plt.xlabel('Time')
# plt.ylabel('Acceleration (g)')
# plt.title('Real-time Acceleration')
# plt.show()

imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN)
imu.start()

logger = Logger(TEST_LOGS_PATH)
logger.start()

while True:
print(imu.get_imu_data_packet())
try:
imu.start()
logger.start()
while True:
print(imu.get_imu_data_packet())
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C
pass
finally:
imu.stop()
logger.stop()
19 changes: 11 additions & 8 deletions scripts/run_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ def main():
# Initialize the logger
logger = Logger(TEST_LOGS_PATH)

# Log for 5 seconds
# Log for 5 seconds, and automatically stops logging
start_time = time.time()
while time.time() - start_time < 5:
# Create fake IMU data
imu_data_list = deque([RawDataPacket(int(time.time()), 1, 2, 3, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6)])
logger.log("TEST_STATE", 0.5, imu_data_list)

# Stop the logger after 5 seconds
logger.stop()
try:
logger.start()
while time.time() - start_time < 5:
# Create fake IMU data
imu_data_list = deque([RawDataPacket(int(time.time()), 1, 2, 3, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6)])
logger.log("TEST_STATE", 0.5, imu_data_list)
except KeyboardInterrupt: # Stop logging if the user presses Ctrl+C
pass
finally:
logger.stop()


if __name__ == "__main__":
Expand Down
32 changes: 32 additions & 0 deletions scripts/run_main_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""The mocked main file which can be run locally. To run this, make sure you're not inside scripts,
and run the following command: `python -m scripts.test_imu`"""
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

from airbrakes.airbrakes import AirbrakesContext
from airbrakes.constants import FREQUENCY, LOGS_PATH, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN
from airbrakes.imu.imu import IMU
from airbrakes.logger import Logger
from airbrakes.servo import Servo

from gpiozero.pins.mock import MockFactory, MockPWMPin


def main():
logger = Logger(LOGS_PATH)
servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION, pin_factory=MockFactory(pin_class=MockPWMPin))
imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN)

# The context that will manage the airbrakes state machine
airbrakes = AirbrakesContext(logger, servo, imu)
try:
airbrakes.start() # Start the IMU and logger processes
# This is the main loop that will run until we press Ctrl+C
while not airbrakes.shutdown_requested:
airbrakes.update()
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C
pass
finally:
airbrakes.stop() # Stop the IMU and logger processes


if __name__ == "__main__":
main()
80 changes: 73 additions & 7 deletions tests/test_airbrakes.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,88 @@
import pytest

from airbrakes.airbrakes import AirbrakesContext
from airbrakes.data_handling.data_processor import IMUDataProcessor
from airbrakes.state import StandByState


@pytest.fixture
def airbrakes_context(imu, logger, servo):
def airbrakes(imu, logger, servo):
return AirbrakesContext(logger, servo, imu)


@pytest.mark.filterwarnings("ignore:To reduce servo jitter") # ignore warning about servo jitter
class TestAirbrakesContext:
"""Tests the AirbrakesContext class"""

def test_slots(self, airbrakes_context):
inst = airbrakes_context
def test_slots(self, airbrakes):
inst = airbrakes
for attr in inst.__slots__:
assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'"

def test_init(self, airbrakes_context, logger, imu, servo):
assert airbrakes_context.logger == logger
assert airbrakes_context.servo == servo
assert airbrakes_context.imu == imu
def test_init(self, airbrakes, logger, imu, servo):
assert airbrakes.logger == logger
assert airbrakes.servo == servo
assert airbrakes.imu == imu
assert airbrakes.current_extension == 0.0
assert isinstance(airbrakes.data_processor, IMUDataProcessor)
assert isinstance(airbrakes.state, StandByState)
assert not airbrakes.shutdown_requested

def test_set_extension(self, airbrakes):
# Hardcoded calculated values, based on MIN_EXTENSION and MAX_EXTENSION in constants.py
airbrakes.set_airbrake_extension(0.5)
# TODO: airbrakes.current_extension must be set to 0.5 !!
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
assert airbrakes.servo.current_extension == 0.0803
airbrakes.set_airbrake_extension(0.0)
assert airbrakes.servo.current_extension == -0.0999
airbrakes.set_airbrake_extension(1.0)
assert airbrakes.servo.current_extension == 0.2605

def test_start(self, airbrakes):
airbrakes.start()
assert airbrakes.imu.is_running
assert airbrakes.logger.is_running
airbrakes.stop()

def test_stop(self, airbrakes):
airbrakes.start()
airbrakes.stop()
assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert not airbrakes.imu._running.value
assert not airbrakes.imu._data_fetch_process.is_alive()
assert not airbrakes.logger._log_process.is_alive()
assert airbrakes.servo.current_extension == -0.0999 # set to "0"
assert airbrakes.shutdown_requested

def test_airbrakes_ctrl_c_clean_exit(self, airbrakes):
"""Tests whether the AirbrakesContext handles ctrl+c events correctly."""
airbrakes.start()

try:
raise KeyboardInterrupt # send a KeyboardInterrupt to test __exit__
except KeyboardInterrupt:
airbrakes.stop()

assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert airbrakes.shutdown_requested

def test_airbrakes_ctrl_c_exception(self, airbrakes):
"""Tests whether the AirbrakesContext handles unknown exceptions."""

airbrakes.start()
try:
raise ValueError("some error in main loop")
except (KeyboardInterrupt, ValueError):
pass
finally:
airbrakes.stop()

assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert airbrakes.shutdown_requested

def test_airbrakes_update(self, monkeypatch):
"""Tests whether the Airbrakes update method works correctly."""
# TODO: Implement this test after we get the state and apogee detection working
55 changes: 55 additions & 0 deletions tests/test_imu.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import multiprocessing
import multiprocessing.sharedctypes
import signal
import time
from collections import deque

import pytest

from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket, IMUDataPacket, RawDataPacket
from airbrakes.hardware.imu import IMU
from constants import FREQUENCY, PORT, UPSIDE_DOWN
Expand Down Expand Up @@ -57,6 +60,58 @@ def _fetch_data_loop(self, port: str, frequency: int, upside_down: bool):
assert not imu.is_running
assert not imu._data_fetch_process.is_alive()

def test_imu_ctrl_c_handling(self, monkeypatch):
"""Tests whether the IMU's stop() handles Ctrl+C fine."""
values = multiprocessing.Queue(100000)

def _fetch_data_loop(self, port: str, frequency: int, upside_down: bool):
"""Monkeypatched method for testing."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
while self._running.value:
continue
values.put((port, frequency, upside_down))

monkeypatch.setattr(IMU, "_fetch_data_loop", _fetch_data_loop)
imu = IMU(port=PORT, frequency=FREQUENCY, upside_down=UPSIDE_DOWN)
imu.start()
assert imu._running.value
assert imu.is_running
assert imu._data_fetch_process.is_alive()
time.sleep(0.001) # Give the process time to start and simulate the actual loop
# send a KeyboardInterrupt to test if the process stops cleanly
try:
raise KeyboardInterrupt
except KeyboardInterrupt:
imu.stop()

assert not imu._running.value
assert not imu.is_running
assert not imu._data_fetch_process.is_alive()
assert values.qsize() == 1
assert values.get() == (PORT, FREQUENCY, UPSIDE_DOWN)

def test_imu_fetch_loop_exception(self, monkeypatch):
"""Tests whether the IMU's _fetch_loop propogates unknown exceptions."""
values = multiprocessing.Queue()

def _fetch_data_loop(self, port: str, frequency: int, upside_down: bool):
"""Monkeypatched method for testing."""
values.put((port, frequency, upside_down))
raise ValueError("some error")

monkeypatch.setattr(IMU, "_fetch_data_loop", _fetch_data_loop)
imu = IMU(port=PORT, frequency=FREQUENCY, upside_down=UPSIDE_DOWN)
imu.start()
with pytest.raises(ValueError, match="some error") as excinfo:
imu._fetch_data_loop(PORT, FREQUENCY, UPSIDE_DOWN)
imu.stop()
assert not imu._running.value
assert not imu.is_running
assert not imu._data_fetch_process.is_alive()
assert values.qsize() == 2
assert values.get() == (PORT, FREQUENCY, UPSIDE_DOWN)
assert "some error" in str(excinfo.value)

def test_data_packets_fetch(self, monkeypatch):
"""Tests whether the data fetching loop actually adds data to the queue."""

Expand Down
Loading