-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
State determination #19
Changes from all commits
ef8a257
094fc0f
a8f73b7
516689a
bffe5d5
0a18cf0
a3901c5
87a91c9
7d8d4d2
c595f2e
b4f22ce
b7fb80c
2c34a36
092aedd
8391b43
201df5d
8a9da79
675fab0
821b0a7
c2c6c9f
0180e30
d280301
744cf53
efb6ae2
19a8219
bfe9446
1cd141e
cf33f08
e3252fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
"""Module for processing IMU data on a higher level.""" | ||
|
||
import statistics as stats | ||
from collections.abc import Sequence | ||
|
||
import numpy as np | ||
|
||
from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket | ||
from airbrakes.utils import deadband | ||
from constants import ACCLERATION_NOISE_THRESHOLD | ||
|
||
|
||
class IMUDataProcessor: | ||
|
@@ -15,14 +18,30 @@ class IMUDataProcessor: | |
:param data_points: A sequence of EstimatedDataPacket objects to process. | ||
""" | ||
|
||
__slots__ = ("_avg_accel", "_avg_accel_mag", "_data_points", "_max_altitude", "upside_down") | ||
__slots__ = ( | ||
"_avg_accel", | ||
"_avg_accel_mag", | ||
"_current_altitude", | ||
"_data_points", | ||
"_initial_altitude", | ||
"_max_altitude", | ||
"_max_speed", | ||
"_previous_velocity", | ||
"_speed", | ||
"upside_down", | ||
) | ||
|
||
def __init__(self, data_points: Sequence[EstimatedDataPacket], upside_down: bool = False): | ||
self.upside_down = upside_down | ||
|
||
self._avg_accel: tuple[float, float, float] = (0.0, 0.0, 0.0) | ||
self._avg_accel_mag: float = 0.0 | ||
self._max_altitude: float = 0.0 | ||
self._speed: float = 0.0 | ||
self._max_speed: float = 0.0 | ||
self._previous_velocity: tuple[float, float, float] = (0.0, 0.0, 0.0) | ||
self._initial_altitude: float | None = None | ||
self._current_altitude: float = 0.0 | ||
self.upside_down = upside_down | ||
|
||
self._data_points: Sequence[EstimatedDataPacket] | ||
|
||
if data_points: # actually update the data on init | ||
|
@@ -35,36 +54,12 @@ def __str__(self) -> str: | |
f"{self.__class__.__name__}(" | ||
f"avg_acceleration={self.avg_acceleration}, " | ||
f"avg_acceleration_mag={self.avg_acceleration_mag}, " | ||
f"max_altitude={self.max_altitude})" | ||
f"max_altitude={self.max_altitude}, " | ||
f"current_altitude={self.current_altitude}, " | ||
f"speed={self.speed}, " | ||
f"max_speed={self.max_speed})" | ||
) | ||
|
||
def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None: | ||
""" | ||
Updates the data points to process. This will recalculate the averages and maximum | ||
altitude. | ||
:param data_points: A sequence of EstimatedDataPacket objects to process. | ||
""" | ||
if not data_points: # Data packets may not be EstimatedDataPacket in the beginning | ||
return | ||
self._data_points = data_points | ||
a_x, a_y, a_z = self._compute_averages() | ||
self._avg_accel = (a_x, a_y, a_z) | ||
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5 | ||
self._max_altitude = max(*(data_point.estPressureAlt for data_point in self._data_points), self._max_altitude) | ||
|
||
def _compute_averages(self) -> tuple[float, float, float]: | ||
""" | ||
Calculates the average acceleration and acceleration magnitude of the data points. | ||
:return: A tuple of the average acceleration in the x, y, and z directions. | ||
""" | ||
# calculate the average acceleration in the x, y, and z directions | ||
# TODO: Test what these accel values actually look like | ||
x_accel = stats.fmean(data_point.estCompensatedAccelX for data_point in self._data_points) | ||
y_accel = stats.fmean(data_point.estCompensatedAccelY for data_point in self._data_points) | ||
z_accel = stats.fmean(data_point.estCompensatedAccelZ for data_point in self._data_points) | ||
# TODO: Calculate avg velocity if that's also available | ||
return x_accel, y_accel, z_accel | ||
|
||
@property | ||
def avg_acceleration_z(self) -> float: | ||
""" | ||
|
@@ -89,6 +84,136 @@ def avg_acceleration_mag(self) -> float: | |
@property | ||
def max_altitude(self) -> float: | ||
""" | ||
Returns the highest altitude attained by the rocket for the entire flight so far, in meters. | ||
Returns the highest altitude (zeroed out) attained by the rocket for the entire flight | ||
so far, in meters. | ||
""" | ||
return self._max_altitude | ||
|
||
@property | ||
def current_altitude(self) -> float: | ||
"""Returns the altitude of the rocket (zeroed out) from the data points, in meters.""" | ||
return self._current_altitude | ||
|
||
@property | ||
def speed(self) -> float: | ||
"""The current speed of the rocket in m/s. Calculated by integrating the linear acceleration.""" | ||
return self._speed | ||
|
||
@property | ||
def max_speed(self) -> float: | ||
"""The maximum speed the rocket has attained during the flight, in m/s.""" | ||
return self._max_speed | ||
|
||
def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None: | ||
""" | ||
Updates the data points to process. This will recompute all the averages and other | ||
information such as altitude, speed, etc. | ||
:param data_points: A sequence of EstimatedDataPacket objects to process. | ||
""" | ||
|
||
if not data_points: # Data packets may not be EstimatedDataPacket in the beginning | ||
return | ||
|
||
# First assign the data points | ||
self._data_points = data_points | ||
|
||
# We use linearAcceleration because we don't want gravity to affect our calculations for | ||
# speed. | ||
# Next, assign variables for linear acceleration, since we don't want to recalculate them | ||
# in the helper functions below: | ||
# List of the acceleration in the x, y, and z directions, useful for calculations below | ||
# If the absolute value of acceleration is less than 0.1, set it to 0 | ||
x_accel = [ | ||
deadband(data_point.estLinearAccelX, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points | ||
] | ||
y_accel = [ | ||
deadband(data_point.estLinearAccelY, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points | ||
] | ||
z_accel = [ | ||
deadband(data_point.estLinearAccelZ, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points | ||
] | ||
|
||
pressure_altitude = [data_point.estPressureAlt for data_point in self._data_points] | ||
|
||
a_x, a_y, a_z = self._compute_averages(x_accel, y_accel, z_accel) | ||
self._avg_accel = (a_x, a_y, a_z) | ||
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5 | ||
|
||
self._speed = self._calculate_speed(x_accel, y_accel, z_accel) | ||
self._max_speed = max(self._speed, self._max_speed) | ||
|
||
# Zero the altitude only once, during the first update: | ||
if self._initial_altitude is None: | ||
self._initial_altitude = np.mean(pressure_altitude) | ||
|
||
self._current_altitude = self._calculate_current_altitude(pressure_altitude) | ||
self._max_altitude = self._calculate_max_altitude(pressure_altitude) | ||
|
||
def _calculate_max_altitude(self, pressure_alt: list[float]) -> float: | ||
""" | ||
Calculates the maximum altitude (zeroed out) of the rocket based on the pressure | ||
altitude during the flight. | ||
:return: The maximum altitude of the rocket in meters. | ||
""" | ||
zeroed_alts = np.array(pressure_alt) - self._initial_altitude | ||
return max(*zeroed_alts, self._max_altitude) | ||
|
||
def _calculate_current_altitude(self, alt_list: list[float]) -> float: | ||
""" | ||
Calculates the current altitude, by zeroing out the latest altitude data point. | ||
:param alt_list: A list of the altitude data points. | ||
""" | ||
# There is a decent chance that the zeroed out altitude is negative, e.g. if the rocket | ||
# landed at a height below from where it launched from, but that doesn't concern us. | ||
return alt_list[-1] - self._initial_altitude | ||
|
||
def _compute_averages(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> tuple[float, float, float]: | ||
""" | ||
Calculates the average acceleration and acceleration magnitude of the data points. | ||
:param a_x: A list of the accelerations in the x direction. | ||
:param a_y: A list of the accelerations in the y direction. | ||
:param a_z: A list of the accelerations in the z direction. | ||
:return: A tuple of the average acceleration in the x, y, and z directions. | ||
""" | ||
# calculate the average acceleration in the x, y, and z directions | ||
avg_x_accel = np.mean(a_x) | ||
avg_y_accel = np.mean(a_y) | ||
avg_z_accel = np.mean(a_z) | ||
return avg_x_accel, avg_y_accel, avg_z_accel | ||
|
||
def _calculate_speed(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> float: | ||
""" | ||
Calculates the speed of the rocket based on the linear acceleration. | ||
Integrates the linear acceleration to get the speed. | ||
""" | ||
# We need at least two data points to calculate the speed: | ||
if len(self._data_points) < 2: | ||
return 0.0 | ||
Comment on lines
+194
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd still like us to reconsider fetching a set amount of data packets in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How would we implement that in a way that ensured we didn't have a build up of data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can tell it, "give me at least 10 packets of data, but if you have more, give it all" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should return the speed from the last loop, not 0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in #22 |
||
|
||
# Side note: We can't use the pressure altitude to calculate the speed, since the pressure | ||
# does not update fast enough to give us a good estimate of the speed. | ||
|
||
# calculate the time differences between each data point | ||
time_diff = np.diff([data_point.timestamp for data_point in self._data_points]) | ||
|
||
# We store the previous calculated velocity vectors, so that our speed | ||
# doesn't show a jump, e.g. after motor burn out. | ||
previous_vel_x, previous_vel_y, previous_vel_z = self._previous_velocity | ||
|
||
# We integrate each of the components of the acceleration to get the velocity | ||
# The [:-1] is used to remove the last element of the list, since we have one less time | ||
# difference than we have acceleration values. | ||
velocity_x: np.array = previous_vel_x + np.cumsum(a_x[:-1] * time_diff) | ||
velocity_y: np.array = previous_vel_y + np.cumsum(a_y[:-1] * time_diff) | ||
velocity_z: np.array = previous_vel_z + np.cumsum(a_z[:-1] * time_diff) | ||
|
||
# Store the last calculated velocity vectors | ||
self._previous_velocity = (velocity_x[-1], velocity_y[-1], velocity_z[-1]) | ||
|
||
# The speed is the magnitude of the velocity vector | ||
# [-1] to get the last element of the array (latest speed) | ||
return np.sqrt(velocity_x**2 + velocity_y**2 + velocity_z**2)[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove data_points from init, just set it equal to an empty queue (or whatever it is)