-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
268 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,74 +1,24 @@ | ||
""" | ||
Make sure you are in the root directory of the project, not inside scripts, and run the following command: | ||
`python -m scripts.test_imu` | ||
`python -m scripts.run_imu` | ||
For the pi, you will have to use python3 | ||
""" | ||
|
||
from constants import FREQUENCY, PORT, TEST_LOGS_PATH | ||
from airbrakes.hardware.imu import IMU | ||
from airbrakes.data_handling.logger import Logger | ||
from pathlib import Path | ||
|
||
# import matplotlib.pyplot as plt | ||
# import matplotlib.animation as animation | ||
# from collections import deque | ||
# import numpy as np | ||
|
||
# # Initialize a deque for each axis to store the last N acceleration values | ||
# N = 100 # Number of points to display | ||
# x_vals = deque(maxlen=N) | ||
# y_vals = deque(maxlen=N) | ||
# z_vals = deque(maxlen=N) | ||
# time_vals = deque(maxlen=N) | ||
|
||
# # Initialize the plot | ||
# fig, ax = plt.subplots() | ||
# ax.set_xlim(0, N) | ||
# ax.set_ylim(-2, 2) # Assuming accelerations range from -2 to 2 g | ||
|
||
# x_line, = ax.plot([], [], label='X-axis') | ||
# y_line, = ax.plot([], [], label='Y-axis') | ||
# z_line, = ax.plot([], [], label='Z-axis') | ||
|
||
# ax.legend() | ||
|
||
# def init(): | ||
# x_line.set_data([], []) | ||
# y_line.set_data([], []) | ||
# z_line.set_data([], []) | ||
# return x_line, y_line, z_line | ||
|
||
# def update(frame): | ||
# # Simulate reading IMU data in real-time | ||
# a = imu.get_imu_data_packet() | ||
# if not isinstance(a, EstimatedDataPacket): | ||
# return x_line, y_line, z_line | ||
|
||
# # Append new data to the deque | ||
# time_vals.append(len(time_vals)) | ||
# x_vals.append(a.estCompensatedAccelX) | ||
# y_vals.append(a.estCompensatedAccelY) | ||
# z_vals.append(a.estCompensatedAccelZ) | ||
|
||
# # Update the plot data | ||
# x_line.set_data(time_vals, x_vals) | ||
# y_line.set_data(time_vals, y_vals) | ||
# z_line.set_data(time_vals, z_vals) | ||
|
||
# return x_line, y_line, z_line | ||
|
||
# ani = animation.FuncAnimation(fig, update, init_func=init, blit=True, interval=50) | ||
|
||
# plt.xlabel('Time') | ||
# plt.ylabel('Acceleration (g)') | ||
# plt.title('Real-time Acceleration') | ||
# plt.show() | ||
|
||
imu = IMU(PORT, FREQUENCY) | ||
imu.start() | ||
|
||
logger = Logger(TEST_LOGS_PATH) | ||
logger.start() | ||
|
||
while True: | ||
print(imu.get_imu_data_packet()) | ||
try: | ||
imu.start() | ||
logger.start() | ||
while True: | ||
print(imu.get_imu_data_packet()) | ||
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C | ||
pass | ||
finally: | ||
imu.stop() | ||
logger.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
"""The mocked main file which can be run locally. To run this, make sure you're not inside scripts, | ||
and run the following command: `python -m scripts.run_main_local`""" | ||
|
||
from airbrakes.airbrakes import AirbrakesContext | ||
from constants import FREQUENCY, LOGS_PATH, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN | ||
from airbrakes.data_handling.data_processor import IMUDataProcessor | ||
from airbrakes.hardware.imu import IMU | ||
from airbrakes.data_handling.logger import Logger | ||
from airbrakes.hardware.servo import Servo | ||
|
||
from gpiozero.pins.mock import MockFactory, MockPWMPin | ||
|
||
|
||
def main(): | ||
logger = Logger(LOGS_PATH) | ||
servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION, pin_factory=MockFactory(pin_class=MockPWMPin)) | ||
imu = IMU(PORT, FREQUENCY) | ||
data_processor = IMUDataProcessor([], UPSIDE_DOWN) | ||
|
||
|
||
# The context that will manage the airbrakes state machine | ||
airbrakes = AirbrakesContext(servo, imu, logger, data_processor) | ||
try: | ||
airbrakes.start() # Start the IMU and logger processes | ||
# This is the main loop that will run until we press Ctrl+C | ||
while not airbrakes.shutdown_requested: | ||
airbrakes.update() | ||
except KeyboardInterrupt: # Stop running IMU and logger if the user presses Ctrl+C | ||
pass | ||
finally: | ||
airbrakes.stop() # Stop the IMU and logger processes | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,91 @@ | ||
import pytest | ||
|
||
from airbrakes.airbrakes import AirbrakesContext | ||
from airbrakes.data_handling.data_processor import IMUDataProcessor | ||
from airbrakes.state import StandByState | ||
|
||
|
||
@pytest.fixture | ||
def airbrakes_context(servo, imu, logger, data_processor): | ||
def airbrakes(imu, logger, servo, data_processor): | ||
return AirbrakesContext(servo, imu, logger, data_processor) | ||
|
||
|
||
@pytest.mark.filterwarnings("ignore:To reduce servo jitter") # ignore warning about servo jitter | ||
class TestAirbrakesContext: | ||
"""Tests the AirbrakesContext class""" | ||
|
||
def test_slots(self, airbrakes_context): | ||
inst = airbrakes_context | ||
def test_slots(self, airbrakes): | ||
inst = airbrakes | ||
for attr in inst.__slots__: | ||
assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'" | ||
|
||
def test_init(self, airbrakes_context, logger, imu, servo, data_processor): | ||
assert airbrakes_context.logger == logger | ||
assert airbrakes_context.servo == servo | ||
assert airbrakes_context.imu == imu | ||
assert airbrakes_context.data_processor == data_processor | ||
def test_init(self, airbrakes, logger, imu, servo, data_processor): | ||
assert airbrakes.logger == logger | ||
assert airbrakes.servo == servo | ||
assert airbrakes.imu == imu | ||
assert airbrakes.current_extension == 0.0 | ||
assert airbrakes.data_processor == data_processor | ||
assert isinstance(airbrakes.data_processor, IMUDataProcessor) | ||
assert isinstance(airbrakes.state, StandByState) | ||
assert not airbrakes.shutdown_requested | ||
|
||
def test_set_extension(self, airbrakes): | ||
# Hardcoded calculated values, based on MIN_EXTENSION and MAX_EXTENSION in constants.py | ||
airbrakes.set_airbrake_extension(0.5) | ||
assert airbrakes.current_extension == 0.5 | ||
assert airbrakes.servo.current_extension == 0.0803 | ||
airbrakes.set_airbrake_extension(0.0) | ||
assert airbrakes.current_extension == 0.0 | ||
assert airbrakes.servo.current_extension == -0.0999 | ||
airbrakes.set_airbrake_extension(1.0) | ||
assert airbrakes.current_extension == 1.0 | ||
assert airbrakes.servo.current_extension == 0.2605 | ||
|
||
def test_start(self, airbrakes): | ||
airbrakes.start() | ||
assert airbrakes.imu.is_running | ||
assert airbrakes.logger.is_running | ||
airbrakes.stop() | ||
|
||
def test_stop(self, airbrakes): | ||
airbrakes.start() | ||
airbrakes.stop() | ||
assert not airbrakes.imu.is_running | ||
assert not airbrakes.logger.is_running | ||
assert not airbrakes.imu._running.value | ||
assert not airbrakes.imu._data_fetch_process.is_alive() | ||
assert not airbrakes.logger._log_process.is_alive() | ||
assert airbrakes.servo.current_extension == -0.0999 # set to "0" | ||
assert airbrakes.shutdown_requested | ||
|
||
def test_airbrakes_ctrl_c_clean_exit(self, airbrakes): | ||
"""Tests whether the AirbrakesContext handles ctrl+c events correctly.""" | ||
airbrakes.start() | ||
|
||
try: | ||
raise KeyboardInterrupt # send a KeyboardInterrupt to test __exit__ | ||
except KeyboardInterrupt: | ||
airbrakes.stop() | ||
|
||
assert not airbrakes.imu.is_running | ||
assert not airbrakes.logger.is_running | ||
assert airbrakes.shutdown_requested | ||
|
||
def test_airbrakes_ctrl_c_exception(self, airbrakes): | ||
"""Tests whether the AirbrakesContext handles unknown exceptions.""" | ||
|
||
airbrakes.start() | ||
try: | ||
raise ValueError("some error in main loop") | ||
except (KeyboardInterrupt, ValueError): | ||
pass | ||
finally: | ||
airbrakes.stop() | ||
|
||
assert not airbrakes.imu.is_running | ||
assert not airbrakes.logger.is_running | ||
assert airbrakes.shutdown_requested | ||
|
||
def test_airbrakes_update(self, monkeypatch): | ||
"""Tests whether the Airbrakes update method works correctly.""" | ||
# TODO: Implement this test after we get the state and apogee detection working |
Oops, something went wrong.