Skip to content

Commit

Permalink
Merge pull request #19 from NCSU-High-Powered-Rocketry-Club/state-det…
Browse files Browse the repository at this point in the history
…ermination

State determination
  • Loading branch information
JacksonElia authored Sep 25, 2024
2 parents 3934e07 + e3252fc commit 4809728
Show file tree
Hide file tree
Showing 19 changed files with 46,299 additions and 116 deletions.
9 changes: 5 additions & 4 deletions airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def update(self) -> None:
return

# Update the processed data with the new data packets. We only care about EstimatedDataPackets
# TODO: Check how many data packets we are processing on average
self.data_processor.update_data(
[data_packet for data_packet in data_packets.copy() if isinstance(data_packet, EstimatedDataPacket)]
)
# Logs the current state, extension, and IMU data
# TODO: Compute state(s) for given IMU data
self.logger.log(self.state.get_name(), self.current_extension, data_packets)

# Update the state machine based on the latest processed data
self.state.update()

# Logs the current state, extension, and IMU data
self.logger.log(self.state.name, self.current_extension, data_packets)

def set_airbrake_extension(self, extension: float) -> None:
"""
Sets the airbrake extension via the servo. It will be called by the states.
Expand Down
191 changes: 158 additions & 33 deletions airbrakes/data_handling/data_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Module for processing IMU data on a higher level."""

import statistics as stats
from collections.abc import Sequence

import numpy as np

from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket
from airbrakes.utils import deadband
from constants import ACCLERATION_NOISE_THRESHOLD


class IMUDataProcessor:
Expand All @@ -15,14 +18,30 @@ class IMUDataProcessor:
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""

__slots__ = ("_avg_accel", "_avg_accel_mag", "_data_points", "_max_altitude", "upside_down")
__slots__ = (
"_avg_accel",
"_avg_accel_mag",
"_current_altitude",
"_data_points",
"_initial_altitude",
"_max_altitude",
"_max_speed",
"_previous_velocity",
"_speed",
"upside_down",
)

def __init__(self, data_points: Sequence[EstimatedDataPacket], upside_down: bool = False):
self.upside_down = upside_down

self._avg_accel: tuple[float, float, float] = (0.0, 0.0, 0.0)
self._avg_accel_mag: float = 0.0
self._max_altitude: float = 0.0
self._speed: float = 0.0
self._max_speed: float = 0.0
self._previous_velocity: tuple[float, float, float] = (0.0, 0.0, 0.0)
self._initial_altitude: float | None = None
self._current_altitude: float = 0.0
self.upside_down = upside_down

self._data_points: Sequence[EstimatedDataPacket]

if data_points: # actually update the data on init
Expand All @@ -35,36 +54,12 @@ def __str__(self) -> str:
f"{self.__class__.__name__}("
f"avg_acceleration={self.avg_acceleration}, "
f"avg_acceleration_mag={self.avg_acceleration_mag}, "
f"max_altitude={self.max_altitude})"
f"max_altitude={self.max_altitude}, "
f"current_altitude={self.current_altitude}, "
f"speed={self.speed}, "
f"max_speed={self.max_speed})"
)

def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None:
"""
Updates the data points to process. This will recalculate the averages and maximum
altitude.
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""
if not data_points: # Data packets may not be EstimatedDataPacket in the beginning
return
self._data_points = data_points
a_x, a_y, a_z = self._compute_averages()
self._avg_accel = (a_x, a_y, a_z)
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5
self._max_altitude = max(*(data_point.estPressureAlt for data_point in self._data_points), self._max_altitude)

def _compute_averages(self) -> tuple[float, float, float]:
"""
Calculates the average acceleration and acceleration magnitude of the data points.
:return: A tuple of the average acceleration in the x, y, and z directions.
"""
# calculate the average acceleration in the x, y, and z directions
# TODO: Test what these accel values actually look like
x_accel = stats.fmean(data_point.estCompensatedAccelX for data_point in self._data_points)
y_accel = stats.fmean(data_point.estCompensatedAccelY for data_point in self._data_points)
z_accel = stats.fmean(data_point.estCompensatedAccelZ for data_point in self._data_points)
# TODO: Calculate avg velocity if that's also available
return x_accel, y_accel, z_accel

@property
def avg_acceleration_z(self) -> float:
"""
Expand All @@ -89,6 +84,136 @@ def avg_acceleration_mag(self) -> float:
@property
def max_altitude(self) -> float:
"""
Returns the highest altitude attained by the rocket for the entire flight so far, in meters.
Returns the highest altitude (zeroed out) attained by the rocket for the entire flight
so far, in meters.
"""
return self._max_altitude

@property
def current_altitude(self) -> float:
"""Returns the altitude of the rocket (zeroed out) from the data points, in meters."""
return self._current_altitude

@property
def speed(self) -> float:
"""The current speed of the rocket in m/s. Calculated by integrating the linear acceleration."""
return self._speed

@property
def max_speed(self) -> float:
"""The maximum speed the rocket has attained during the flight, in m/s."""
return self._max_speed

def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None:
"""
Updates the data points to process. This will recompute all the averages and other
information such as altitude, speed, etc.
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""

if not data_points: # Data packets may not be EstimatedDataPacket in the beginning
return

# First assign the data points
self._data_points = data_points

# We use linearAcceleration because we don't want gravity to affect our calculations for
# speed.
# Next, assign variables for linear acceleration, since we don't want to recalculate them
# in the helper functions below:
# List of the acceleration in the x, y, and z directions, useful for calculations below
# If the absolute value of acceleration is less than 0.1, set it to 0
x_accel = [
deadband(data_point.estLinearAccelX, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]
y_accel = [
deadband(data_point.estLinearAccelY, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]
z_accel = [
deadband(data_point.estLinearAccelZ, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]

pressure_altitude = [data_point.estPressureAlt for data_point in self._data_points]

a_x, a_y, a_z = self._compute_averages(x_accel, y_accel, z_accel)
self._avg_accel = (a_x, a_y, a_z)
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5

self._speed = self._calculate_speed(x_accel, y_accel, z_accel)
self._max_speed = max(self._speed, self._max_speed)

# Zero the altitude only once, during the first update:
if self._initial_altitude is None:
self._initial_altitude = np.mean(pressure_altitude)

self._current_altitude = self._calculate_current_altitude(pressure_altitude)
self._max_altitude = self._calculate_max_altitude(pressure_altitude)

def _calculate_max_altitude(self, pressure_alt: list[float]) -> float:
"""
Calculates the maximum altitude (zeroed out) of the rocket based on the pressure
altitude during the flight.
:return: The maximum altitude of the rocket in meters.
"""
zeroed_alts = np.array(pressure_alt) - self._initial_altitude
return max(*zeroed_alts, self._max_altitude)

def _calculate_current_altitude(self, alt_list: list[float]) -> float:
"""
Calculates the current altitude, by zeroing out the latest altitude data point.
:param alt_list: A list of the altitude data points.
"""
# There is a decent chance that the zeroed out altitude is negative, e.g. if the rocket
# landed at a height below from where it launched from, but that doesn't concern us.
return alt_list[-1] - self._initial_altitude

def _compute_averages(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> tuple[float, float, float]:
"""
Calculates the average acceleration and acceleration magnitude of the data points.
:param a_x: A list of the accelerations in the x direction.
:param a_y: A list of the accelerations in the y direction.
:param a_z: A list of the accelerations in the z direction.
:return: A tuple of the average acceleration in the x, y, and z directions.
"""
# calculate the average acceleration in the x, y, and z directions
avg_x_accel = np.mean(a_x)
avg_y_accel = np.mean(a_y)
avg_z_accel = np.mean(a_z)
return avg_x_accel, avg_y_accel, avg_z_accel

def _calculate_speed(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> float:
"""
Calculates the speed of the rocket based on the linear acceleration.
Integrates the linear acceleration to get the speed.
"""
# We need at least two data points to calculate the speed:
if len(self._data_points) < 2:
return 0.0

# Side note: We can't use the pressure altitude to calculate the speed, since the pressure
# does not update fast enough to give us a good estimate of the speed.

# calculate the time differences between each data point
time_diff = np.diff([data_point.timestamp for data_point in self._data_points])

# We store the previous calculated velocity vectors, so that our speed
# doesn't show a jump, e.g. after motor burn out.
previous_vel_x, previous_vel_y, previous_vel_z = self._previous_velocity

# We integrate each of the components of the acceleration to get the velocity
# The [:-1] is used to remove the last element of the list, since we have one less time
# difference than we have acceleration values.
velocity_x: np.array = previous_vel_x + np.cumsum(a_x[:-1] * time_diff)
velocity_y: np.array = previous_vel_y + np.cumsum(a_y[:-1] * time_diff)
velocity_z: np.array = previous_vel_z + np.cumsum(a_z[:-1] * time_diff)

# Store the last calculated velocity vectors
self._previous_velocity = (velocity_x[-1], velocity_y[-1], velocity_z[-1])

# The speed is the magnitude of the velocity vector
# [-1] to get the last element of the array (latest speed)
return np.sqrt(velocity_x**2 + velocity_y**2 + velocity_z**2)[-1]
87 changes: 80 additions & 7 deletions airbrakes/state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
"""Module for the finite state machine that represents which state of flight we are in."""

import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from constants import (
DISTANCE_FROM_APOGEE,
GROUND_ALTITIUDE,
MOTOR_BURN_TIME,
TAKEOFF_HEIGHT,
TAKEOFF_SPEED,
)

if TYPE_CHECKING:
from airbrakes.airbrakes import AirbrakesContext

Expand Down Expand Up @@ -31,7 +40,8 @@ def __init__(self, context: "AirbrakesContext"):
# At the very beginning of each state, we retract the airbrakes
self.context.set_airbrake_extension(0.0)

def get_name(self):
@property
def name(self):
"""
:return: The name of the state
"""
Expand Down Expand Up @@ -60,7 +70,21 @@ class StandByState(State):
__slots__ = ()

def update(self):
pass
"""
Checks if the rocket has launched, based on our speed and altitude.
"""
# We need to check if the rocket has launched, if it has, we move to the next state.
# For that we can check:
# 1) Speed - If the speed of the rocket is above a threshold, the rocket has
# launched.
# 2) Altitude - If the altitude is above a threshold, the rocket has launched.
# Ideally we would directly communicate with the motor, but we don't have that capability.

data = self.context.data_processor

if data.speed > TAKEOFF_SPEED or data.current_altitude > TAKEOFF_HEIGHT:
self.next_state()
return

def next_state(self):
self.context.state = MotorBurnState(self.context)
Expand All @@ -71,26 +95,54 @@ class MotorBurnState(State):
When the motor is burning and the rocket is accelerating.
"""

__slots__ = ()
__slots__ = ("start_time",)

def __init__(self, context: "AirbrakesContext"):
super().__init__(context)
self.start_time = time.time()

def update(self):
pass
"""Checks to see if the acceleration has dropped to zero, indicating the motor has
burned out."""

data = self.context.data_processor

# If our current speed is less than our max speed, that means we have stopped accelerating
# This is the same thing as checking if our accel sign has flipped
if data.speed < data.max_speed:
self.next_state()
return

# Fallback: if our motor has burned for longer than its burn time, go to the next state
if time.time() - self.start_time > MOTOR_BURN_TIME:
self.next_state()
return

def next_state(self):
self.context.state = FlightState(self.context)
# Deploy the airbrakes as soon as we enter the Flight state
self.context.set_airbrake_extension(1.0)


class FlightState(State):
"""
When the motor has burned out and the rocket is coasting.
When the motor has burned out and the rocket is coasting to apogee.
"""

__slots__ = ()

def update(self):
pass
"""Checks to see if the rocket has reached apogee, indicating the start of free fall."""

data = self.context.data_processor

# if our altitude has started to decrease, we have reached apogee:
if data.max_altitude - data.current_altitude > DISTANCE_FROM_APOGEE:
self.next_state()
return

def next_state(self):
# This also retracts the airbrakes:
self.context.state = FreeFallState(self.context)


Expand All @@ -102,7 +154,28 @@ class FreeFallState(State):
__slots__ = ()

def update(self):
pass
"""Check if the rocket has landed, based on our altitude."""

data = self.context.data_processor

# If our altitude is 0, we have landed:
if data.current_altitude <= GROUND_ALTITIUDE:
self.next_state()

def next_state(self):
# Explicitly do nothing, there is no next state
self.context.state = LandedState(self.context)


class LandedState(State):
"""
When the rocket has landed.
"""

__slots__ = ()

def update(self):
"""Nothing to check, we just wait for the rocket to be recovered."""

def next_state(self):
# Explicitly do nothing, there is no next state
Expand Down
Loading

0 comments on commit 4809728

Please sign in to comment.