Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean exit on Ctrl+C #14

Merged
merged 9 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ def update(self) -> None:
do different things. It is what controls the airbrakes and chooses when to move to the next
state.
"""
# Gets the current extension and IMU data, the states will use these values
self.current_extension = self.servo.current_extension

# get_imu_data_packets() gets from the "first" item in the queue, i.e, the set of data
# *may* not be the most recent data. But we want continous data for state, apogee,
# and logging purposes, so we don't need to worry about that, as long as we're not too
Expand All @@ -85,6 +82,7 @@ def set_airbrake_extension(self, extension: float) -> None:
:param extension: the extension of the airbrakes, between 0 and 1
"""
self.servo.set_extension(extension)
self.current_extension = extension

def stop(self) -> None:
"""
Expand Down
3 changes: 3 additions & 0 deletions airbrakes/data_handling/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import csv
import multiprocessing
import signal
from pathlib import Path

from airbrakes.data_handling.imu_data_packet import IMUDataPacket
Expand Down Expand Up @@ -85,6 +86,8 @@ def _logging_loop(self) -> None:
"""
The loop that saves data to the logs. It runs in parallel with the main loop.
"""
# Ignore the SIGINT (Ctrl+C) signal, because we only want the main process to handle it
signal.signal(signal.SIGINT, signal.SIG_IGN) # Ignores the interrupt signal
# Set up the csv logging in the new process
with self.log_path.open(mode="a", newline="") as file_writer:
writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS)
Expand Down
9 changes: 7 additions & 2 deletions airbrakes/hardware/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import collections
import multiprocessing
import signal
import warnings

# Try to import the MSCL library, if it fails, warn the user, this is necessary because installing mscl is annoying
Expand All @@ -10,7 +11,7 @@
import mscl
except ImportError:
warnings.warn(
"Could not import MSCL, IMU will not work. Please see installation instructions"
"Could not import MSCL, IMU will not work. Please see installation instructions "
"here: https://github.com/LORD-MicroStrain/MSCL/tree/master",
stacklevel=2,
)
Expand Down Expand Up @@ -50,7 +51,9 @@ def __init__(self, port: str, frequency: int):
self._running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes

# Starts the process that fetches data from the IMU
self._data_fetch_process = multiprocessing.Process(target=self._fetch_data_loop, args=(port, frequency))
self._data_fetch_process = multiprocessing.Process(
target=self._fetch_data_loop, args=(port, frequency), name="IMU Data Fetch Process"
)

@property
def is_running(self) -> bool:
Expand Down Expand Up @@ -105,6 +108,8 @@ def _fetch_data_loop(self, port: str, frequency: int) -> None:
:param port: the port that the IMU is connected to
:param frequency: the frequency that the IMU is set to poll at
"""
# Ignore the SIGINT (Ctrl+C) signal, because we only want the main process to handle it
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Connect to the IMU
connection = mscl.Connection.Serial(port)
node = mscl.InertialNode(connection)
Expand Down
18 changes: 9 additions & 9 deletions main.py
JacksonElia marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ def main(is_simulation: bool) -> None:
# The context that will manage the airbrakes state machine
airbrakes = AirbrakesContext(servo, imu, logger, data_processor)

# Start the IMU and logger processes:
airbrakes.start()

# This is the main loop that will run until the stop method on the airbrakes is called
while not airbrakes.shutdown_requested:
airbrakes.update()

# Shutdown the IMU and logger processes:
airbrakes.stop()
try:
airbrakes.start() # Start the IMU and logger processes
# This is the main loop that will run until we press Ctrl+C
while not airbrakes.shutdown_requested:
airbrakes.update()
except KeyboardInterrupt:
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
pass
finally:
airbrakes.stop() # Stop the IMU and logger processes


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ select = ["E", "F", "I", "PL", "UP", "RUF", "PTH", "C4", "B", "PIE", "SIM", "RET

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["T20", "S101", "D100", "ARG001"]


[tool.pytest.ini_options]
filterwarnings = "ignore:This process:DeprecationWarning" # ignore warning about fork()
72 changes: 11 additions & 61 deletions scripts/run_imu.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,24 @@
"""
Make sure you are in the root directory of the project, not inside scripts, and run the following command:
`python -m scripts.test_imu`
`python -m scripts.run_imu`
For the pi, you will have to use python3
"""

from constants import FREQUENCY, PORT, TEST_LOGS_PATH
from airbrakes.hardware.imu import IMU
from airbrakes.data_handling.logger import Logger
from pathlib import Path

# import matplotlib.pyplot as plt
# import matplotlib.animation as animation
# from collections import deque
# import numpy as np

# # Initialize a deque for each axis to store the last N acceleration values
# N = 100 # Number of points to display
# x_vals = deque(maxlen=N)
# y_vals = deque(maxlen=N)
# z_vals = deque(maxlen=N)
# time_vals = deque(maxlen=N)

# # Initialize the plot
# fig, ax = plt.subplots()
# ax.set_xlim(0, N)
# ax.set_ylim(-2, 2) # Assuming accelerations range from -2 to 2 g

# x_line, = ax.plot([], [], label='X-axis')
# y_line, = ax.plot([], [], label='Y-axis')
# z_line, = ax.plot([], [], label='Z-axis')

# ax.legend()

# def init():
# x_line.set_data([], [])
# y_line.set_data([], [])
# z_line.set_data([], [])
# return x_line, y_line, z_line

# def update(frame):
# # Simulate reading IMU data in real-time
# a = imu.get_imu_data_packet()
# if not isinstance(a, EstimatedDataPacket):
# return x_line, y_line, z_line

# # Append new data to the deque
# time_vals.append(len(time_vals))
# x_vals.append(a.estCompensatedAccelX)
# y_vals.append(a.estCompensatedAccelY)
# z_vals.append(a.estCompensatedAccelZ)

# # Update the plot data
# x_line.set_data(time_vals, x_vals)
# y_line.set_data(time_vals, y_vals)
# z_line.set_data(time_vals, z_vals)

# return x_line, y_line, z_line

# ani = animation.FuncAnimation(fig, update, init_func=init, blit=True, interval=50)

# plt.xlabel('Time')
# plt.ylabel('Acceleration (g)')
# plt.title('Real-time Acceleration')
# plt.show()

imu = IMU(PORT, FREQUENCY)
imu.start()

logger = Logger(TEST_LOGS_PATH)
logger.start()

while True:
print(imu.get_imu_data_packet())
try:
imu.start()
logger.start()
while True:
print(imu.get_imu_data_packet())
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C
pass
finally:
imu.stop()
logger.stop()
19 changes: 11 additions & 8 deletions scripts/run_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ def main():
# Initialize the logger
logger = Logger(TEST_LOGS_PATH)

# Log for 5 seconds
# Log for 5 seconds, and automatically stops logging
start_time = time.time()
while time.time() - start_time < 5:
# Create fake IMU data
imu_data_list = deque([RawDataPacket(int(time.time()), 1, 2, 3, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6)])
logger.log("TEST_STATE", 0.5, imu_data_list)

# Stop the logger after 5 seconds
logger.stop()
try:
logger.start()
while time.time() - start_time < 5:
# Create fake IMU data
imu_data_list = deque([RawDataPacket(int(time.time()), 1, 2, 3, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6)])
logger.log("TEST_STATE", 0.5, imu_data_list)
except KeyboardInterrupt: # Stop logging if the user presses Ctrl+C
pass
finally:
logger.stop()


if __name__ == "__main__":
Expand Down
35 changes: 35 additions & 0 deletions scripts/run_main_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""The mocked main file which can be run locally. To run this, make sure you're not inside scripts,
and run the following command: `python -m scripts.run_main_local`"""

from airbrakes.airbrakes import AirbrakesContext
from constants import FREQUENCY, LOGS_PATH, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN
from airbrakes.data_handling.data_processor import IMUDataProcessor
from airbrakes.hardware.imu import IMU
from airbrakes.data_handling.logger import Logger
from airbrakes.hardware.servo import Servo

from gpiozero.pins.mock import MockFactory, MockPWMPin


def main():
logger = Logger(LOGS_PATH)
servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION, pin_factory=MockFactory(pin_class=MockPWMPin))
imu = IMU(PORT, FREQUENCY)
data_processor = IMUDataProcessor([], UPSIDE_DOWN)


# The context that will manage the airbrakes state machine
airbrakes = AirbrakesContext(servo, imu, logger, data_processor)
try:
airbrakes.start() # Start the IMU and logger processes
# This is the main loop that will run until we press Ctrl+C
while not airbrakes.shutdown_requested:
airbrakes.update()
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C
pass
finally:
airbrakes.stop() # Stop the IMU and logger processes


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion scripts/run_servo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Make sure you are in the root directory of the project, not inside scripts, and run the following command:
`python -m scripts.test_servo`
`python -m scripts.run_servo`
For the pi, you will have to use python3
"""

Expand Down
84 changes: 76 additions & 8 deletions tests/test_airbrakes.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,91 @@
import pytest

from airbrakes.airbrakes import AirbrakesContext
from airbrakes.data_handling.data_processor import IMUDataProcessor
from airbrakes.state import StandByState


@pytest.fixture
def airbrakes_context(servo, imu, logger, data_processor):
def airbrakes(imu, logger, servo, data_processor):
return AirbrakesContext(servo, imu, logger, data_processor)


@pytest.mark.filterwarnings("ignore:To reduce servo jitter") # ignore warning about servo jitter
class TestAirbrakesContext:
"""Tests the AirbrakesContext class"""

def test_slots(self, airbrakes_context):
inst = airbrakes_context
def test_slots(self, airbrakes):
inst = airbrakes
for attr in inst.__slots__:
assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'"

def test_init(self, airbrakes_context, logger, imu, servo, data_processor):
assert airbrakes_context.logger == logger
assert airbrakes_context.servo == servo
assert airbrakes_context.imu == imu
assert airbrakes_context.data_processor == data_processor
def test_init(self, airbrakes, logger, imu, servo, data_processor):
assert airbrakes.logger == logger
assert airbrakes.servo == servo
assert airbrakes.imu == imu
assert airbrakes.current_extension == 0.0
assert airbrakes.data_processor == data_processor
assert isinstance(airbrakes.data_processor, IMUDataProcessor)
assert isinstance(airbrakes.state, StandByState)
assert not airbrakes.shutdown_requested

def test_set_extension(self, airbrakes):
# Hardcoded calculated values, based on MIN_EXTENSION and MAX_EXTENSION in constants.py
airbrakes.set_airbrake_extension(0.5)
assert airbrakes.current_extension == 0.5
assert airbrakes.servo.current_extension == 0.0803
airbrakes.set_airbrake_extension(0.0)
assert airbrakes.current_extension == 0.0
assert airbrakes.servo.current_extension == -0.0999
airbrakes.set_airbrake_extension(1.0)
assert airbrakes.current_extension == 1.0
assert airbrakes.servo.current_extension == 0.2605

def test_start(self, airbrakes):
airbrakes.start()
assert airbrakes.imu.is_running
assert airbrakes.logger.is_running
airbrakes.stop()

def test_stop(self, airbrakes):
airbrakes.start()
airbrakes.stop()
assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert not airbrakes.imu._running.value
assert not airbrakes.imu._data_fetch_process.is_alive()
assert not airbrakes.logger._log_process.is_alive()
assert airbrakes.servo.current_extension == -0.0999 # set to "0"
assert airbrakes.shutdown_requested

def test_airbrakes_ctrl_c_clean_exit(self, airbrakes):
"""Tests whether the AirbrakesContext handles ctrl+c events correctly."""
airbrakes.start()

try:
raise KeyboardInterrupt # send a KeyboardInterrupt to test __exit__
except KeyboardInterrupt:
airbrakes.stop()

assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert airbrakes.shutdown_requested

def test_airbrakes_ctrl_c_exception(self, airbrakes):
"""Tests whether the AirbrakesContext handles unknown exceptions."""

airbrakes.start()
try:
raise ValueError("some error in main loop")
except (KeyboardInterrupt, ValueError):
pass
finally:
airbrakes.stop()

assert not airbrakes.imu.is_running
assert not airbrakes.logger.is_running
assert airbrakes.shutdown_requested

def test_airbrakes_update(self, monkeypatch):
"""Tests whether the Airbrakes update method works correctly."""
# TODO: Implement this test after we get the state and apogee detection working
Loading