Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State determination #19

Merged
merged 29 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ef8a257
Use context managers for clean exit on Ctrl+C
harshil21 Sep 19, 2024
094fc0f
bye context manager
harshil21 Sep 20, 2024
a8f73b7
ruff fixes
harshil21 Sep 20, 2024
516689a
Merge main and fix conflicts
harshil21 Sep 20, 2024
bffe5d5
Fix import
harshil21 Sep 20, 2024
0a18cf0
Merge branch 'fix-logging' into state-determination
harshil21 Sep 21, 2024
a3901c5
Initial code for state determination and switching
harshil21 Sep 22, 2024
87a91c9
Add tests for state determination
harshil21 Sep 22, 2024
7d8d4d2
Merge main and fix conflicts
harshil21 Sep 22, 2024
c595f2e
Review: airbrakes extension, doc fixes
harshil21 Sep 22, 2024
b4f22ce
Fix tests
harshil21 Sep 22, 2024
b7fb80c
Merge fix-ctrl-c branch and fix conflicts
harshil21 Sep 22, 2024
2c34a36
Merge main and fix conflicts
harshil21 Sep 24, 2024
092aedd
Deploy airbrakes after Motorburn
harshil21 Sep 24, 2024
8391b43
Ruff fixes
harshil21 Sep 24, 2024
201df5d
changes
MrPezser Sep 25, 2024
8a9da79
Merge branch 'state-determination' of https://github.com/NCSU-High-Po…
MrPezser Sep 25, 2024
675fab0
Merge branch 'main' into state-determination
harshil21 Sep 25, 2024
821b0a7
Merge branch 'state-determination' of https://github.com/NCSU-High-Po…
MrPezser Sep 25, 2024
c2c6c9f
Added deadband
MrPezser Sep 25, 2024
0180e30
Loosely finished state changes
MrPezser Sep 25, 2024
d280301
Fix cyclic imports, fix ruff and fix tests
harshil21 Sep 25, 2024
744cf53
Add LandedState and tests for it
harshil21 Sep 25, 2024
efb6ae2
clear test log file directory before each running a logger test
harshil21 Sep 25, 2024
19a8219
Removed run_local_main. Use main.py mock
harshil21 Sep 25, 2024
bfe9446
Remove log_9.csv
harshil21 Sep 25, 2024
1cd141e
Clean up
JacksonElia Sep 25, 2024
cf33f08
reverted part of last commit
JacksonElia Sep 25, 2024
e3252fc
Fixed test
JacksonElia Sep 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def update(self) -> None:
return

# Update the processed data with the new data packets. We only care about EstimatedDataPackets
# TODO: Check how many data packets we are processing on average
self.data_processor.update_data(
[data_packet for data_packet in data_packets.copy() if isinstance(data_packet, EstimatedDataPacket)]
)
# Logs the current state, extension, and IMU data
# TODO: Compute state(s) for given IMU data
self.logger.log(self.state.get_name(), self.current_extension, data_packets)

# Update the state machine based on the latest processed data
self.state.update()

# Logs the current state, extension, and IMU data
self.logger.log(self.state.name, self.current_extension, data_packets)

def set_airbrake_extension(self, extension: float) -> None:
"""
Sets the airbrake extension via the servo. It will be called by the states.
Expand Down
191 changes: 158 additions & 33 deletions airbrakes/data_handling/data_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Module for processing IMU data on a higher level."""

import statistics as stats
from collections.abc import Sequence

import numpy as np

from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket
from airbrakes.utils import deadband
from constants import ACCLERATION_NOISE_THRESHOLD


class IMUDataProcessor:
Expand All @@ -15,14 +18,30 @@ class IMUDataProcessor:
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""

__slots__ = ("_avg_accel", "_avg_accel_mag", "_data_points", "_max_altitude", "upside_down")
__slots__ = (
"_avg_accel",
"_avg_accel_mag",
"_current_altitude",
"_data_points",
"_initial_altitude",
"_max_altitude",
"_max_speed",
"_previous_velocity",
"_speed",
"upside_down",
)

def __init__(self, data_points: Sequence[EstimatedDataPacket], upside_down: bool = False):
Copy link
Member

@JacksonElia JacksonElia Sep 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove data_points from init, just set it equal to an empty queue (or whatever it is)

self.upside_down = upside_down

self._avg_accel: tuple[float, float, float] = (0.0, 0.0, 0.0)
self._avg_accel_mag: float = 0.0
self._max_altitude: float = 0.0
self._speed: float = 0.0
self._max_speed: float = 0.0
self._previous_velocity: tuple[float, float, float] = (0.0, 0.0, 0.0)
self._initial_altitude: float | None = None
self._current_altitude: float = 0.0
self.upside_down = upside_down

self._data_points: Sequence[EstimatedDataPacket]

if data_points: # actually update the data on init
Expand All @@ -35,36 +54,12 @@ def __str__(self) -> str:
f"{self.__class__.__name__}("
f"avg_acceleration={self.avg_acceleration}, "
f"avg_acceleration_mag={self.avg_acceleration_mag}, "
f"max_altitude={self.max_altitude})"
f"max_altitude={self.max_altitude}, "
f"current_altitude={self.current_altitude}, "
f"speed={self.speed}, "
f"max_speed={self.max_speed})"
)

def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None:
"""
Updates the data points to process. This will recalculate the averages and maximum
altitude.
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""
if not data_points: # Data packets may not be EstimatedDataPacket in the beginning
return
self._data_points = data_points
a_x, a_y, a_z = self._compute_averages()
self._avg_accel = (a_x, a_y, a_z)
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5
self._max_altitude = max(*(data_point.estPressureAlt for data_point in self._data_points), self._max_altitude)

def _compute_averages(self) -> tuple[float, float, float]:
"""
Calculates the average acceleration and acceleration magnitude of the data points.
:return: A tuple of the average acceleration in the x, y, and z directions.
"""
# calculate the average acceleration in the x, y, and z directions
# TODO: Test what these accel values actually look like
x_accel = stats.fmean(data_point.estCompensatedAccelX for data_point in self._data_points)
y_accel = stats.fmean(data_point.estCompensatedAccelY for data_point in self._data_points)
z_accel = stats.fmean(data_point.estCompensatedAccelZ for data_point in self._data_points)
# TODO: Calculate avg velocity if that's also available
return x_accel, y_accel, z_accel

@property
def avg_acceleration_z(self) -> float:
"""
Expand All @@ -89,6 +84,136 @@ def avg_acceleration_mag(self) -> float:
@property
def max_altitude(self) -> float:
"""
Returns the highest altitude attained by the rocket for the entire flight so far, in meters.
Returns the highest altitude (zeroed out) attained by the rocket for the entire flight
so far, in meters.
"""
return self._max_altitude

@property
def current_altitude(self) -> float:
"""Returns the altitude of the rocket (zeroed out) from the data points, in meters."""
return self._current_altitude

@property
def speed(self) -> float:
"""The current speed of the rocket in m/s. Calculated by integrating the linear acceleration."""
return self._speed

@property
def max_speed(self) -> float:
"""The maximum speed the rocket has attained during the flight, in m/s."""
return self._max_speed

def update_data(self, data_points: Sequence[EstimatedDataPacket]) -> None:
"""
Updates the data points to process. This will recompute all the averages and other
information such as altitude, speed, etc.
:param data_points: A sequence of EstimatedDataPacket objects to process.
"""

if not data_points: # Data packets may not be EstimatedDataPacket in the beginning
return

# First assign the data points
self._data_points = data_points

# We use linearAcceleration because we don't want gravity to affect our calculations for
# speed.
# Next, assign variables for linear acceleration, since we don't want to recalculate them
# in the helper functions below:
# List of the acceleration in the x, y, and z directions, useful for calculations below
# If the absolute value of acceleration is less than 0.1, set it to 0
x_accel = [
deadband(data_point.estLinearAccelX, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]
y_accel = [
deadband(data_point.estLinearAccelY, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]
z_accel = [
deadband(data_point.estLinearAccelZ, ACCLERATION_NOISE_THRESHOLD) for data_point in self._data_points
]

pressure_altitude = [data_point.estPressureAlt for data_point in self._data_points]

a_x, a_y, a_z = self._compute_averages(x_accel, y_accel, z_accel)
self._avg_accel = (a_x, a_y, a_z)
self._avg_accel_mag = (self._avg_accel[0] ** 2 + self._avg_accel[1] ** 2 + self._avg_accel[2] ** 2) ** 0.5

self._speed = self._calculate_speed(x_accel, y_accel, z_accel)
self._max_speed = max(self._speed, self._max_speed)

# Zero the altitude only once, during the first update:
if self._initial_altitude is None:
self._initial_altitude = np.mean(pressure_altitude)

self._current_altitude = self._calculate_current_altitude(pressure_altitude)
self._max_altitude = self._calculate_max_altitude(pressure_altitude)

def _calculate_max_altitude(self, pressure_alt: list[float]) -> float:
"""
Calculates the maximum altitude (zeroed out) of the rocket based on the pressure
altitude during the flight.
:return: The maximum altitude of the rocket in meters.
"""
zeroed_alts = np.array(pressure_alt) - self._initial_altitude
return max(*zeroed_alts, self._max_altitude)

def _calculate_current_altitude(self, alt_list: list[float]) -> float:
"""
Calculates the current altitude, by zeroing out the latest altitude data point.
:param alt_list: A list of the altitude data points.
"""
# There is a decent chance that the zeroed out altitude is negative, e.g. if the rocket
# landed at a height below from where it launched from, but that doesn't concern us.
return alt_list[-1] - self._initial_altitude

def _compute_averages(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> tuple[float, float, float]:
"""
Calculates the average acceleration and acceleration magnitude of the data points.
:param a_x: A list of the accelerations in the x direction.
:param a_y: A list of the accelerations in the y direction.
:param a_z: A list of the accelerations in the z direction.
:return: A tuple of the average acceleration in the x, y, and z directions.
"""
# calculate the average acceleration in the x, y, and z directions
avg_x_accel = np.mean(a_x)
avg_y_accel = np.mean(a_y)
avg_z_accel = np.mean(a_z)
return avg_x_accel, avg_y_accel, avg_z_accel

def _calculate_speed(self, a_x: list[float], a_y: list[float], a_z: list[float]) -> float:
"""
Calculates the speed of the rocket based on the linear acceleration.
Integrates the linear acceleration to get the speed.
"""
# We need at least two data points to calculate the speed:
if len(self._data_points) < 2:
return 0.0
Comment on lines +194 to +195
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still like us to reconsider fetching a set amount of data packets in get_imu_data_packets(), so that this condition is never hit and things are a little more consistent overall.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we implement that in a way that ensured we didn't have a build up of data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can tell it, "give me at least 10 packets of data, but if you have more, give it all"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should return the speed from the last loop, not 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #22


# Side note: We can't use the pressure altitude to calculate the speed, since the pressure
# does not update fast enough to give us a good estimate of the speed.

# calculate the time differences between each data point
time_diff = np.diff([data_point.timestamp for data_point in self._data_points])

# We store the previous calculated velocity vectors, so that our speed
# doesn't show a jump, e.g. after motor burn out.
previous_vel_x, previous_vel_y, previous_vel_z = self._previous_velocity

# We integrate each of the components of the acceleration to get the velocity
# The [:-1] is used to remove the last element of the list, since we have one less time
# difference than we have acceleration values.
velocity_x: np.array = previous_vel_x + np.cumsum(a_x[:-1] * time_diff)
velocity_y: np.array = previous_vel_y + np.cumsum(a_y[:-1] * time_diff)
velocity_z: np.array = previous_vel_z + np.cumsum(a_z[:-1] * time_diff)

# Store the last calculated velocity vectors
self._previous_velocity = (velocity_x[-1], velocity_y[-1], velocity_z[-1])

# The speed is the magnitude of the velocity vector
# [-1] to get the last element of the array (latest speed)
return np.sqrt(velocity_x**2 + velocity_y**2 + velocity_z**2)[-1]
87 changes: 80 additions & 7 deletions airbrakes/state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
"""Module for the finite state machine that represents which state of flight we are in."""

import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from constants import (
DISTANCE_FROM_APOGEE,
GROUND_ALTITIUDE,
MOTOR_BURN_TIME,
TAKEOFF_HEIGHT,
TAKEOFF_SPEED,
)

if TYPE_CHECKING:
from airbrakes.airbrakes import AirbrakesContext

Expand Down Expand Up @@ -31,7 +40,8 @@ def __init__(self, context: "AirbrakesContext"):
# At the very beginning of each state, we retract the airbrakes
self.context.set_airbrake_extension(0.0)

def get_name(self):
@property
def name(self):
"""
:return: The name of the state
"""
Expand Down Expand Up @@ -60,7 +70,21 @@ class StandByState(State):
__slots__ = ()

def update(self):
pass
"""
Checks if the rocket has launched, based on our speed and altitude.
"""
# We need to check if the rocket has launched, if it has, we move to the next state.
# For that we can check:
# 1) Speed - If the speed of the rocket is above a threshold, the rocket has
# launched.
# 2) Altitude - If the altitude is above a threshold, the rocket has launched.
# Ideally we would directly communicate with the motor, but we don't have that capability.

data = self.context.data_processor

if data.speed > TAKEOFF_SPEED or data.current_altitude > TAKEOFF_HEIGHT:
self.next_state()
return

def next_state(self):
self.context.state = MotorBurnState(self.context)
Expand All @@ -71,26 +95,54 @@ class MotorBurnState(State):
When the motor is burning and the rocket is accelerating.
"""

__slots__ = ()
__slots__ = ("start_time",)

def __init__(self, context: "AirbrakesContext"):
super().__init__(context)
self.start_time = time.time()

def update(self):
pass
"""Checks to see if the acceleration has dropped to zero, indicating the motor has
burned out."""

data = self.context.data_processor

# If our current speed is less than our max speed, that means we have stopped accelerating
# This is the same thing as checking if our accel sign has flipped
if data.speed < data.max_speed:
self.next_state()
return

# Fallback: if our motor has burned for longer than its burn time, go to the next state
if time.time() - self.start_time > MOTOR_BURN_TIME:
self.next_state()
return

def next_state(self):
self.context.state = FlightState(self.context)
# Deploy the airbrakes as soon as we enter the Flight state
self.context.set_airbrake_extension(1.0)


class FlightState(State):
"""
When the motor has burned out and the rocket is coasting.
When the motor has burned out and the rocket is coasting to apogee.
"""

__slots__ = ()

def update(self):
pass
"""Checks to see if the rocket has reached apogee, indicating the start of free fall."""

data = self.context.data_processor

# if our altitude has started to decrease, we have reached apogee:
if data.max_altitude - data.current_altitude > DISTANCE_FROM_APOGEE:
self.next_state()
return

def next_state(self):
# This also retracts the airbrakes:
self.context.state = FreeFallState(self.context)


Expand All @@ -102,7 +154,28 @@ class FreeFallState(State):
__slots__ = ()

def update(self):
pass
"""Check if the rocket has landed, based on our altitude."""

data = self.context.data_processor

# If our altitude is 0, we have landed:
if data.current_altitude <= GROUND_ALTITIUDE:
self.next_state()

def next_state(self):
# Explicitly do nothing, there is no next state
self.context.state = LandedState(self.context)


class LandedState(State):
"""
When the rocket has landed.
"""

__slots__ = ()

def update(self):
"""Nothing to check, we just wait for the rocket to be recovered."""

def next_state(self):
# Explicitly do nothing, there is no next state
Expand Down
Loading