From 9ffb930e86e69570443fb331f7e703b3381b1f65 Mon Sep 17 00:00:00 2001 From: Harshil <37377066+harshil21@users.noreply.github.com> Date: Sat, 14 Sep 2024 05:06:02 -0400 Subject: [PATCH] Feat: Add IMUDataProcessor and use csv logger Multiple fixes and tiny improvements as well --- Scripts/test_imu.py | 10 +---- Scripts/test_logger.py | 8 ++-- Scripts/test_servo.py | 8 +--- airbrakes/airbrakes.py | 37 ++++++++-------- airbrakes/constants.py | 27 ++++++++++++ airbrakes/imu/data_processor.py | 75 ++++++++++++++++++++++++++++++++ airbrakes/imu/imu.py | 36 ++++----------- airbrakes/imu/imu_data_packet.py | 31 ++++++++----- airbrakes/logger.py | 47 ++++++++++---------- airbrakes/state.py | 19 +++++++- main.py | 24 ++-------- requirements.txt | 2 +- 12 files changed, 201 insertions(+), 123 deletions(-) create mode 100644 airbrakes/constants.py create mode 100644 airbrakes/imu/data_processor.py diff --git a/Scripts/test_imu.py b/Scripts/test_imu.py index d1c12079..dd689fcc 100644 --- a/Scripts/test_imu.py +++ b/Scripts/test_imu.py @@ -4,14 +4,8 @@ For the pi, you will have to use python3 """ -from airbrakes.imu import IMU - -# Should be checked before launch -UPSIDE_DOWN = True -# The port that the IMU is connected to -PORT = "/dev/ttyACM0" -# The frequency in which the IMU polls data in Hz -FREQUENCY = 100 +from airbrakes.constants import FREQUENCY, PORT, UPSIDE_DOWN +from airbrakes.imu.imu import IMU imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN) diff --git a/Scripts/test_logger.py b/Scripts/test_logger.py index de2093e0..2fa8aa27 100644 --- a/Scripts/test_logger.py +++ b/Scripts/test_logger.py @@ -1,15 +1,13 @@ """Module to test the logger module.""" -from airbrakes.imu import IMUDataPacket +from airbrakes.imu.imu_data_packet import IMUDataPacket from airbrakes.logger import Logger -CSV_HEADERS = ["state", "extension", *list(IMUDataPacket(0.0).__slots__)] - def main(): - logger = Logger(CSV_HEADERS) + logger = Logger() - while True: + while True: # TODO: will not work logger.log("state", 0.0, IMUDataPacket(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) diff --git a/Scripts/test_servo.py b/Scripts/test_servo.py index c51bb6ad..d9b1adab 100644 --- a/Scripts/test_servo.py +++ b/Scripts/test_servo.py @@ -4,15 +4,9 @@ For the pi, you will have to use python3 """ +from airbrakes.constants import MAX_EXTENSION, MIN_EXTENSION, SERVO_PIN from airbrakes.servo import Servo -# The pin that the servo's data wire is plugged into, in this case the GPIO 12 pin which is used for PWM -SERVO_PIN = 12 - -# The minimum and maximum position of the servo, its range is -1 to 1 -MIN_EXTENSION = -1 -MAX_EXTENSION = 1 - servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION) print("Type (1) to deploy and (0) to retract the airbrakes.") diff --git a/airbrakes/airbrakes.py b/airbrakes/airbrakes.py index a5c047e8..043b989b 100644 --- a/airbrakes/airbrakes.py +++ b/airbrakes/airbrakes.py @@ -2,7 +2,9 @@ from typing import TYPE_CHECKING -from airbrakes.imu import IMU, IMUDataPacket, RollingAverages +from airbrakes.imu.data_processor import ProcessedIMUData +from airbrakes.imu.imu import IMU, IMUDataPacket +from airbrakes.imu.imu_data_packet import EstimatedDataPacket from airbrakes.logger import Logger from airbrakes.servo import Servo from airbrakes.state import StandByState, State @@ -21,27 +23,28 @@ class AirbrakesContext: __slots__ = ( "current_extension", - "current_imu_data", "imu", "logger", + "processed_data", "servo", "shutdown_requested", "state", ) def __init__(self, logger: Logger, servo: Servo, imu: IMU): - self.logger = logger - self.servo = servo - self.imu = imu + self.logger: Logger = logger + self.servo: Servo = servo + self.imu: IMU = imu self.state: State = StandByState(self) self.shutdown_requested = False - # Placeholder for the current airbrake extension and IMU data until they are set - self.current_extension = 0.0 - self.current_imu_data = IMUDataPacket(0.0) + self.processed_data: ProcessedIMUData = ProcessedIMUData([]) - def update(self): + # Placeholder for the current airbrake extension until they are set + self.current_extension: float = 0.0 + + def update(self) -> None: """ Called every loop iteration from the main process. Depending on the current state, it will do different things. It is what controls the airbrakes and chooses when to move to the next @@ -50,30 +53,30 @@ def update(self): # Gets the current extension and IMU data, the states will use these values self.current_extension = self.servo.current_extension - # Let's get 50 data packets to ensure we have enough data to work with. - # 50 is an arbitrary number for now - if the time resolution between each data packet is - # 2ms, then we have 2*50 = 100ms of data to work with at once. # get_imu_data_packets() gets from the "first" item in the queue, i.e, the set of data - # *may* not be the most recent data. But we want continous data for proper state and - # apogee calculation, so we don't need to worry about that, as long as we're not too + # *may* not be the most recent data. But we want continous data for state, apogee, + # and logging purposes, so we don't need to worry about that, as long as we're not too # behind on processing data_packets: collections.deque[IMUDataPacket] = self.imu.get_imu_data_packets() + # Update the processed data with the new data packets. We only care about EstimatedDataPackets + self.processed_data.update( + data_packet for data_packet in data_packets if isinstance(data_packet, EstimatedDataPacket) + ) # Logs the current state, extension, and IMU data - RollingAverages(data_packets.copy()) # TODO: Compute state(s) for given IMU data self.logger.log(self.state.get_name(), self.current_extension, data_packets.copy()) self.state.update() - def set_airbrake_extension(self, extension: float): + def set_airbrake_extension(self, extension: float) -> None: """ Sets the airbrake extension via the servo. It will be called by the states. :param extension: the extension of the airbrakes, between 0 and 1 """ self.servo.set_extension(extension) - def shutdown(self): + def shutdown(self) -> None: """ Handles shutting down the airbrakes. This will cause the main loop to break. """ diff --git a/airbrakes/constants.py b/airbrakes/constants.py new file mode 100644 index 00000000..149901ac --- /dev/null +++ b/airbrakes/constants.py @@ -0,0 +1,27 @@ +"""Contains the constants used in the airbrakes module""" + +from airbrakes.imu.imu_data_packet import EstimatedDataPacket, IMUDataPacket, RawDataPacket + +# The pin that the servo's data wire is plugged into, in this case the GPIO 12 pin which is used for PWM +SERVO_PIN = 12 + +# The minimum and maximum position of the servo, its range is -1 to 1 +MIN_EXTENSION = -1 +MAX_EXTENSION = 1 + +# Should be checked before launch +UPSIDE_DOWN = True # TODO: Currently not factored in the implementation +# The port that the IMU is connected to +PORT = "/dev/ttyACM0" + +# The frequency in Hz that the IMU will be polled at +FREQUENCY = 100 # TODO: Remove this since we don't/can't control the frequency from the code. + +# The headers for the CSV file +CSV_HEADERS = [ + "State", + "Extension", + *list(IMUDataPacket.__slots__), + *list(RawDataPacket.__slots__), + *list(EstimatedDataPacket.__slots__), +] diff --git a/airbrakes/imu/data_processor.py b/airbrakes/imu/data_processor.py new file mode 100644 index 00000000..86e2db35 --- /dev/null +++ b/airbrakes/imu/data_processor.py @@ -0,0 +1,75 @@ +"""Module for processing IMU data on a higher level.""" + +import statistics as stats +from collections.abc import Sequence + +from airbrakes.imu.imu_data_packet import EstimatedDataPacket + + +class ProcessedIMUData: + """Performs high level calculations on the data packets received from the IMU. Includes + calculation the rolling averages of acceleration, maximum altitude so far, etc, from the set of + data points. + + Args: + data_points (Sequence[EstimatedDataPacket]): A list of EstimatedDataPacket objects + to process. + """ + + __slots__ = ("_avg_accel", "_avg_accel_mag", "_max_altitude", "data_points") + + def __init__(self, data_points: Sequence[EstimatedDataPacket]): + self.data_points: Sequence[EstimatedDataPacket] = data_points + self._avg_accel: tuple[float, float, float] = (0.0, 0.0, 0.0) + self._avg_accel_mag: float = 0.0 + self._max_altitude: float = 0.0 + + def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None: + """Updates the data points to process. This will recalculate the averages and maximum + altitude.""" + self.data_points = data_points + a_x, a_y, a_z = self.compute_averages() + self._avg_accel = (a_x, a_y, a_z) + self._avg_accel_mag = (self._avg_accel**2 + self._avg_accel**2 + self._avg_accel**2) ** 0.5 + self._max_altitude = max(*(data_point.altitude for data_point in self.data_points), self._max_altitude) + + def compute_averages(self) -> tuple[float, float, float]: + """Calculates the average acceleration and acceleration magnitude of the data points.""" + # calculate the average acceleration in the x, y, and z directions + # TODO: Test what these accel values actually look like + x_accel = stats.fmean(data_point.estCompensatedAccelX for data_point in self.data_points) + y_accel = stats.fmean(data_point.estCompensatedAccelY for data_point in self.data_points) + z_accel = stats.fmean(data_point.estCompensatedAccelZ for data_point in self.data_points) + # TODO: Calculate avg velocity if that's also available + return x_accel, y_accel, z_accel + + @property + def avg_acceleration_z(self) -> float: + """Returns the average acceleration in the z direction of the data points, in m/s^2.""" + return self._avg_accel[-1] + + @property + def avg_acceleration(self) -> tuple[float, float, float]: + """Returns the averaged acceleration as a vector of the data points, in m/s^2.""" + return self._avg_accel + + @property + def avg_acceleration_mag(self) -> float: + """Returns the magnitude of the acceleration vector of the data points, in m/s^2.""" + return self._avg_accel_mag + + @property + def max_altitude(self) -> float: + """Returns the highest altitude attained by the rocket for the entire flight so far, + in meters. + """ + return self._max_altitude + + def __str__(self) -> str: + """Returns a string representation of the ProcessedIMUData object.""" + return ( + f"{self.__class__.__name__}(" + f"avg_acceleration={self.avg_acceleration}, " + f"avg_acceleration_mag={self.avg_acceleration_mag}, " + f"max_altitude={self.max_altitude})" + ) diff --git a/airbrakes/imu/imu.py b/airbrakes/imu/imu.py index 723b2446..e9af6abd 100644 --- a/airbrakes/imu/imu.py +++ b/airbrakes/imu/imu.py @@ -2,33 +2,12 @@ import collections import multiprocessing -from typing import Literal import mscl from airbrakes.imu.imu_data_packet import EstimatedDataPacket, IMUDataPacket, RawDataPacket -class RollingAverages: - """Calculates the rolling averages of acceleration, (and?) from the set of data points""" - - def __init__(self, data_points: list[IMUDataPacket]): - self.data_points = data_points - - def add_estimated_data_packet(self): - pass - - def calculate_average(self, field: Literal["acceleration"]) -> None: - if field == "acceleration": - self.averaged_acceleration = sum(data_point.acceleration for data_point in self.data_points) / len( - self.data_points - ) - - @property - def averaged_acceleration(self): - return self.averaged_acceleration - - class IMU: """ Represents the IMU on the rocket. It's used to get the current acceleration of the rocket. This is used to interact @@ -44,7 +23,6 @@ class IMU: RAW_DESCRIPTOR_SET = 128 __slots__ = ( - "connection", "data_fetch_process", "data_queue", "running", @@ -73,8 +51,9 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool): while self.running.value: # Get the latest data packets from the IMU, with the help of `getDataPackets`. # `getDataPackets` accepts a timeout in milliseconds. - # Testing has shown that the maximum rate at which we can fetch data is roughly every - # 2ms on average, so we use a timeout of 1000 / frequency = 10ms which should be more + # During IMU configuration (outside of this code), we set the sampling rate of the IMU + # as 1ms for RawDataPackets, and 2ms for EstimatedDataPackets. + # So we use a timeout of 1000 / frequency = 10ms which should be more # than enough. If the timeout is hit, the function will return an empty list. packets: mscl.MipDataPackets = node.getDataPackets(timeout) @@ -108,7 +87,8 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool): case "estAttitudeUncertQuaternion" | "estOrientQuaternion": matrix = data_point.as_Matrix() # Converts the [4x1] matrix to a tuple - # TODO: maybe we should just make these be stored in the data packet with individual attributes + # TODO: maybe we should just make these be stored in the data + # packet with individual attributes quaternion_tuple = tuple(matrix[i, 0] for i in range(matrix.rows())) setattr(imu_data_packet, channel, quaternion_tuple) case _: @@ -120,7 +100,7 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool): self.data_queue.put(imu_data_packet) # TODO: this is where we should calculate the rolling averages - def get_imu_data_packet(self) -> IMUDataPacket: + def get_imu_data_packet(self) -> IMUDataPacket | EstimatedDataPacket: """ Gets the last available data packet from the IMU. @@ -132,8 +112,8 @@ def get_imu_data_packet(self) -> IMUDataPacket: """ return self.data_queue.get() - def get_imu_data_packets(self) -> collections.deque[IMUDataPacket]: - """Returns a specified amount of data packets from the IMU. + def get_imu_data_packets(self) -> collections.deque[IMUDataPacket | EstimatedDataPacket]: + """Returns all available data packets from the IMU. :return: A deque containing the specified number of data packets """ diff --git a/airbrakes/imu/imu_data_packet.py b/airbrakes/imu/imu_data_packet.py index d26d2642..481e7b6c 100644 --- a/airbrakes/imu/imu_data_packet.py +++ b/airbrakes/imu/imu_data_packet.py @@ -1,20 +1,30 @@ """Module for describing the datapackets from the IMU""" +def _mro_slots(obj) -> list[str]: + """Helper function to get all __slots__ from the MRO (Method Resolution Order) of an object""" + return [ + attr + for cls in obj.__class__.__mro__[:-1] # :-1 doesn't include the object class + for attr in cls.__slots__ + ] + + class IMUDataPacket: """ - Represents a collection of data packets from the IMU. It contains the acceleration, velocity, altitude, yaw, pitch, - roll of the rocket and the timestamp of the data. The attributes should be named the same as they are when sent from - the IMU--this just means they're going to be in camelCase. + Base class representing a collection of data packets from the IMU. + The attributes should be named the same as they are when sent from the IMU -- this just means + they're going to be in camelCase. """ __slots__ = ("timestamp",) def __init__(self, timestamp: float): + # TODO: This likely doesn't work as there is no "timestamp" field in the data packets self.timestamp = timestamp def __str__(self): - attributes = ", ".join([f"{attr}={getattr(self, attr)}" for attr in self.__slots__]) + attributes = ", ".join(f"{attr}={getattr(self, attr)}" for attr in _mro_slots(self)) return f"{self.__class__.__name__}({attributes})" @@ -26,8 +36,8 @@ class RawDataPacket(IMUDataPacket): __slots__ = ( "gpsCorrelTimestampFlags", - "gpsCorrelTimestampTow", - "gpsCorrelTimestampWeekNum", + "gpsCorrelTimestampTow", # Time of week + "gpsCorrelTimestampWeekNum", # Week number "scaledAccelX", "scaledAccelY", "scaledAccelZ", @@ -64,8 +74,9 @@ def __init__( class EstimatedDataPacket(IMUDataPacket): """ - Represents an estimated data packet from the IMU. These values are the processed values of the raw data - that are supposed to be more accurate/smoothed. It contains a timestamp and the estimated values of the relevant data points. + Represents an estimated data packet from the IMU. These values are the processed values of the + raw data that are supposed to be more accurate/smoothed. It contains a timestamp and the + estimated values of the relevant data points. """ __slots__ = ( @@ -77,8 +88,8 @@ class EstimatedDataPacket(IMUDataPacket): "estCompensatedAccelY", "estCompensatedAccelZ", "estFilterDynamicsMode", - "estFilterGpsTimeTow", - "estFilterGpsTimeWeekNum", + "estFilterGpsTimeTow", # Time of week + "estFilterGpsTimeWeekNum", # Week number "estFilterState", "estFilterStatusFlags", "estOrientQuaternion", diff --git a/airbrakes/logger.py b/airbrakes/logger.py index e69598c3..78559732 100644 --- a/airbrakes/logger.py +++ b/airbrakes/logger.py @@ -1,11 +1,12 @@ """Module for logging data to a CSV file in real time.""" import collections -import logging +import csv import multiprocessing from pathlib import Path -from airbrakes.imu import IMUDataPacket +from airbrakes.constants import CSV_HEADERS +from airbrakes.imu.imu_data_packet import IMUDataPacket class Logger: @@ -18,11 +19,9 @@ class Logger: real time. """ - __slots__ = ("csv_headers", "log_path", "log_process", "log_queue", "running") - - def __init__(self, csv_headers: list[str]): - self.csv_headers = csv_headers + __slots__ = ("log_path", "log_process", "log_queue", "running") + def __init__(self): log_dir = Path("logs") log_dir.mkdir(parents=True, exist_ok=True) @@ -33,7 +32,8 @@ def __init__(self, csv_headers: list[str]): # Create a new log file with the next number in sequence self.log_path = log_dir / f"log_{max_suffix + 1}.csv" with self.log_path.open(mode="w", newline="") as file_writer: - file_writer.write(",".join(csv_headers) + "\n") + writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS) + writer.writeheader() # Makes a queue to store log messages, basically it's a process-safe list that you add to the back and pop from # front, meaning that things will be logged in the order they were added @@ -48,20 +48,17 @@ def _logging_loop(self): """ The loop that saves data to the logs. It runs in parallel with the main loop. """ - # Set up the logger in the new process - logger = logging.getLogger("logger") - logger.setLevel(logging.INFO) - handler = logging.FileHandler(self.log_path, mode="a") # Append to the file - handler.setLevel(logging.INFO) - logger.addHandler(handler) - - while self.running.value: - # Get a message from the queue (this will block until a message is available) - # Because there's no timeout, it will wait indefinitely until it gets a message -- this is fine in practice, - # as >100 messages a second should be added to the queue, but if for some reason the queue is empty, it will - # block forever and stop() won't work - message = self.log_queue.get() - logger.info(message) + # Set up the csv logging in the new process + with self.log_path.open(mode="a", newline="") as file_writer: + writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS) + while self.running.value: + # Get a message from the queue (this will block until a message is available) + # Because there's no timeout, it will wait indefinitely until it gets a message + # -- this is fine in practice, as >100 messages a second should be added to the + # queue, but if for some reason the queue is empty, it will block forever and + # stop() won't work + message_fields = self.log_queue.get() + writer.writerow(message_fields) def log(self, state: str, extension: float, imu_data_list: collections.deque[IMUDataPacket]): """ @@ -70,14 +67,14 @@ def log(self, state: str, extension: float, imu_data_list: collections.deque[IMU :param extension: the current extension of the airbrakes :param imu_data_list: the current list of IMU data packets to log """ - imu_slots = IMUDataPacket.__slots__ # used to iterate through all available attributes - # Loop through all the IMU data packets for imu_data in imu_data_list: # Formats the log message as a CSV line - message = f"{state},{extension},{','.join(str(getattr(imu_data, value)) for value in imu_slots)}" + message_dict = {"State": state, "Extension": extension}.update( + {key: getattr(imu_data, key) for key in imu_data.__slots__} + ) # Put the message in the queue - self.log_queue.put(message) + self.log_queue.put(message_dict) def stop(self): """ diff --git a/airbrakes/state.py b/airbrakes/state.py index fd0ec1bc..d88df5f7 100644 --- a/airbrakes/state.py +++ b/airbrakes/state.py @@ -1,10 +1,17 @@ """Module for the finite state machine that represents which state of flight we are in.""" +from abc import ABC, abstractmethod +from typing import override + from airbrakes.airbrakes import AirbrakesContext -class State: +class State(ABC): """ + Abstract Base class for the states of the airbrakes system. Each state will have an update + method that will be called every loop iteration and a next_state method that will be called + when the state is over. + For Airbrakes, we will have 4 states: 1. Stand By - when the rocket is on the rail on the ground 2. Motor Burn - when the motor is burning and the rocket is accelerating @@ -23,12 +30,14 @@ def __init__(self, context: AirbrakesContext): # At the very beginning of each state, we retract the airbrakes self.context.set_airbrake_extension(0.0) + @abstractmethod def update(self): """ Called every loop iteration. Uses the context to interact with the hardware and decides when to move to the next state. """ + @abstractmethod def next_state(self): """ We never expect/want to go back a state e.g. We're never going to go @@ -49,9 +58,11 @@ class StandByState(State): __slots__ = () + @override def update(self): pass + @override def next_state(self): self.context.state = MotorBurnState(self.context) @@ -63,9 +74,11 @@ class MotorBurnState(State): __slots__ = () + @override def update(self): pass + @override def next_state(self): self.context.state = FlightState(self.context) @@ -77,9 +90,11 @@ class FlightState(State): __slots__ = () + @override def update(self): pass + @override def next_state(self): self.context.state = FreeFallState(self.context) @@ -91,9 +106,11 @@ class FreeFallState(State): __slots__ = () + @override def update(self): pass + @override def next_state(self): # Explicitly do nothing, there is no next state pass diff --git a/main.py b/main.py index c03042c4..f5f95acc 100644 --- a/main.py +++ b/main.py @@ -2,32 +2,14 @@ loop.""" from airbrakes.airbrakes import AirbrakesContext -from airbrakes.imu import IMU, IMUDataPacket +from airbrakes.constants import FREQUENCY, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN +from airbrakes.imu.imu import IMU from airbrakes.logger import Logger from airbrakes.servo import Servo -# The pin that the servo's data wire is plugged into, in this case the GPIO 12 pin which is used for PWM -SERVO_PIN = 12 - -# The minimum and maximum position of the servo, its range is -1 to 1 -MIN_EXTENSION = -1 -MAX_EXTENSION = 1 - -# Should be checked before launch -UPSIDE_DOWN = True -# The port that the IMU is connected to -PORT = "/dev/ttyACM0" -# The frequency in Hz that the IMU will be polled at -# TODO: need to do testing with imu to see how the different data packets affect logging frequency -# TODO: Potential idea is making a separate method to get raw vs est data, and then have both held in the context -FREQUENCY = 100 - -# The headers for the CSV file -CSV_HEADERS = ["State", "Extension", *list(IMUDataPacket(0.0).__slots__)] - def main(): - logger = Logger(CSV_HEADERS) + logger = Logger() servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION) imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN) diff --git a/requirements.txt b/requirements.txt index a694f503..deace276 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -logging +gpiozero \ No newline at end of file