Skip to content

Commit

Permalink
WIP: Introduce rolling averages and begin separating IMU data packets…
Browse files Browse the repository at this point in the history
… (raw/est)

Co-authored-by: Jack <[email protected]>
  • Loading branch information
harshil21 and JacksonElia committed Sep 13, 2024
1 parent d6a5d7b commit 47f2029
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 46 deletions.
5 changes: 3 additions & 2 deletions Airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import TYPE_CHECKING

from Airbrakes.imu import IMU, IMUDataPacket
from Airbrakes.imu import IMU, IMUDataPacket, RollingAverages
from Airbrakes.logger import Logger
from Airbrakes.servo import Servo
from Airbrakes.state import StandByState, State
Expand Down Expand Up @@ -57,9 +57,10 @@ def update(self):
# *may* not be the most recent data. But we want continous data for proper state and
# apogee calculation, so we don't need to worry about that, as long as we're not too
# behind on processing
data_packets: collections.deque[IMUDataPacket] = self.imu.get_imu_data_packets(50)
data_packets: collections.deque[IMUDataPacket] = self.imu.get_imu_data_packets()

# Logs the current state, extension, and IMU data
rolling_average = RollingAverages(data_packets.copy())

Check failure on line 63 in Airbrakes/airbrakes.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F841)

Airbrakes/airbrakes.py:63:9: F841 Local variable `rolling_average` is assigned to but never used
# TODO: Compute state(s) for given IMU data
self.logger.log(self.state.get_name(), self.current_extension, data_packets.copy())

Expand Down
114 changes: 71 additions & 43 deletions Airbrakes/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import collections
import multiprocessing
from typing import Literal

import mscl

Expand All @@ -12,40 +13,75 @@ class IMUDataPacket:
roll of the rocket and the timestamp of the data.
"""

__slots__ = ("timestamp",)

def __init__(self, timestamp: float):
self.timestamp = timestamp


class RawDataPacket(IMUDataPacket):

Check failure on line 22 in Airbrakes/imu.py

View workflow job for this annotation

GitHub Actions / build

Ruff (D101)

Airbrakes/imu.py:22:7: D101 Missing docstring in public class
__slots__ = ("",)

def __init__(self, altitude: float, yaw: float, pitch: float, roll: float):
super().__init__(0.0)
self.altitude = altitude
self.yaw = yaw
self.pitch = pitch
self.roll = roll


class EstimatedDataPacket(IMUDataPacket):

Check failure on line 33 in Airbrakes/imu.py

View workflow job for this annotation

GitHub Actions / build

Ruff (D101)

Airbrakes/imu.py:33:7: D101 Missing docstring in public class
__slots__ = (
"acceleration",
"altitude",
"filter_state",
"pitch",
"pitch_uncert",
"roll",
"timestamp",
"velocity",
"roll_uncert",
"yaw",
"yaw_uncert",
)

def __init__(
self,
timestamp: float,
acceleration: float | None = None,
velocity: float | None = None,
altitude: float | None = None,
yaw: float | None = None,
pitch: float | None = None,
roll: float | None = None,
altitude: float,
yaw: float,
pitch: float,
roll: float,
filter_state: float,
roll_uncert: float,
pitch_uncert: float,
yaw_uncert: float,
):
self.timestamp = timestamp
self.acceleration = acceleration
self.velocity = velocity
super().__init__(0.0)
self.altitude = altitude
self.yaw = yaw
self.pitch = pitch
self.roll = roll
self.filter_state = filter_state
self.roll_uncert = roll_uncert
self.pitch_uncert = pitch_uncert
self.yaw_uncert = yaw_uncert

def __str__(self):
return (
f"IMUDataPacket(timestamp={self.timestamp}, acceleration={self.acceleration}, "
f"velocity={self.velocity}, altitude={self.altitude}, yaw={self.yaw}, pitch={self.pitch}, "
f"roll={self.roll})"
)

class RollingAverages:
"""Calculates the rolling averages of acceleration, (and?) from the set of data points"""

def __init__(self, data_points: list[IMUDataPacket]) -> None:
self.data_points = data_points

def add_estimated_data_packet(self):
pass

def calculate_average(self, field: Literal["acceleration"]) -> None:
if field == "acceleration":
self.averaged_acceleration = sum(data_point.acceleration for data_point in self.data_points) / len(
self.data_points
)

@property
def averaged_acceleration(self):
return self.averaged_acceleration


class IMU:
Expand Down Expand Up @@ -124,53 +160,45 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool):
imu_data_packet.pitch = data_point.as_float()
case "estRoll":
imu_data_packet.roll = data_point.as_float()
case "estFilterState":
imu_data_packet.filter_state = data_point.as_float()
case "estRollUncert":
imu_data_packet.roll_uncert = data_point.as_float()
case "estPitchUncert":
imu_data_packet.pitch_uncert = data_point.as_float()
case "estYawUncert":
imu_data_packet.yaw_uncert = data_point.as_float()

elif packet.descriptorSet() == self.RAW_DESCRIPTOR_SET:
# depending on the descriptor set, its a different type of packet
pass

# Put the latest data into the shared queue
self.data_queue.put(imu_data_packet)

def get_imu_data_packet(self, block: bool = True) -> IMUDataPacket:
def get_imu_data_packet(self) -> IMUDataPacket:
"""
Gets the last available data packet from the IMU.
Note: If `get_imu_data_packet` is called slower than the frequency set, the data will not
be the latest, but the first in the queue.
:param block: If True, the function will block until data is available. If False, it may
raise a `multiprocessing.queue.Empty` exception if no data is available.
:return: an IMUDataPacket object containing the latest data from the IMU. If a value is
not available, it will be None.
:raises multiprocessing.queue.Empty: If block is False and there is no data available.
"""
return self.data_queue.get() if block else self.data_queue.get_nowait()
return self.data_queue.get()

def get_imu_data_packets(self, num_packets: int, block: bool = True) -> collections.deque[IMUDataPacket]:
def get_imu_data_packets(self) -> collections.deque[IMUDataPacket]:
"""Returns a specified amount of data packets from the IMU.
:param num_packets: The number of packets to return.
:param block: If True, the function will block until the specified number of packets are
available. Otherwise, it will return the available packets, which may be less than
`num_packets`.
:return: A deque containing the specified number of data packets, or less, depending on
`block`.
:return: A deque containing the specified number of data packets
"""

data_packets = collections.deque()

if block:
while len(data_packets) < num_packets:
data_packets.append(self.get_imu_data_packet())
else:
try:
for _ in range(num_packets):
data_packets.append(self.get_imu_data_packet(block=False))
except multiprocessing.queues.Empty:
pass
while not self.data_queue.empty():
data_packet = self.get_imu_data_packet()
data_packets.append(data_packet)

return data_packets

Expand Down
2 changes: 1 addition & 1 deletion Airbrakes/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def log(self, state: str, extension: float, imu_data_list: collections.deque[IMU
:param extension: the current extension of the airbrakes
:param imu_data_list: the current list of IMU data packets to log
"""
imu_slots = IMUDataPacket.__slots__ # used to iterate through all availale attributes
imu_slots = IMUDataPacket.__slots__ # used to iterate through all available attributes

# Loop through all the IMU data packets
for imu_data in imu_data_list:
Expand Down

0 comments on commit 47f2029

Please sign in to comment.