Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use faster-fifo for queues & more performance improvements #99

Merged
merged 31 commits into from
Jan 8, 2025
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8dd53c0
initial support for faster-fifo
harshil21 Nov 13, 2024
ee4edab
Merge main and fix conflicts
harshil21 Nov 21, 2024
86e39f7
Convert all m.Queues to f-f.Queue
DirtyPi09 Nov 22, 2024
d7beb97
some testing
harshil21 Nov 22, 2024
8f4d305
Fix log buffer handling, some refinements
harshil21 Nov 22, 2024
d2f288c
Uncomment display code
harshil21 Nov 22, 2024
83d8f90
More concise way to handle log buffer
harshil21 Nov 23, 2024
276cb30
More minor performance improvements
harshil21 Nov 23, 2024
45a50bb
adjust comment
harshil21 Nov 25, 2024
977eb2a
Keep support for Windows
harshil21 Nov 30, 2024
c41f3f4
No need of copy(), so another tiny perf boost
harshil21 Nov 30, 2024
06f9e3b
Add comment
harshil21 Dec 2, 2024
047ed7f
Merge main and fix conflicts
harshil21 Dec 2, 2024
9a35861
Perf improv: Don't use zip() in get_processed_data_packets
harshil21 Dec 6, 2024
873353f
Perf improv: Don't check for missing values when calculating rotations
harshil21 Dec 6, 2024
70a97b0
Perf improv: Don't use zip() and enumerate() in rotated accel calc
harshil21 Dec 6, 2024
b27e164
Perf improv: Don't specify dtype in np.array
harshil21 Dec 6, 2024
38b2475
Perf improv: Make get_processed_data_packets return a list instead of…
harshil21 Dec 6, 2024
5e95ae7
Fix tests. Also add a utils file for tests
harshil21 Dec 7, 2024
aa8f5be
Avoid recomputing variable in apogee_predictor
harshil21 Dec 7, 2024
9f52f79
Review: Docstring, variable, break
harshil21 Dec 13, 2024
d865eaa
Review: rename loop var: idx -> i
harshil21 Dec 25, 2024
164fce9
Use block=True & more robust tests to detect clean shutdown
harshil21 Jan 1, 2025
df896ec
Fix Windows runtime
harshil21 Jan 2, 2025
a1f39bf
Run integration tests on Windows
harshil21 Jan 2, 2025
cdb4f18
Add note about Windows support for tests in the README
harshil21 Jan 2, 2025
c17dfda
Add another test with the display running too
harshil21 Jan 6, 2025
3f1ea27
Revert strictness check
harshil21 Jan 8, 2025
35b69e7
Merge main and fix conflicts
harshil21 Jan 8, 2025
ebb7ac7
fix a test
harshil21 Jan 8, 2025
685047a
Fix windows integration test
harshil21 Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial support for faster-fifo
harshil21 committed Nov 13, 2024

Verified

This commit was signed with the committer’s verified signature.
mertalev Mert
commit 8dd53c00ef080d1405cb917f5813cf7a1fbb4fe9
14 changes: 9 additions & 5 deletions airbrakes/data_handling/logger.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
from pathlib import Path
from typing import Any, Literal

from faster_fifo import Queue
from msgspec import to_builtins

from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket, IMUDataPacket
@@ -62,9 +63,10 @@ def __init__(self, log_dir: Path) -> None:
# the back and pop from front, meaning that things will be logged in the order they were
# added.
# Signals (like stop) are sent as strings, but data is sent as dictionaries
self._log_queue: multiprocessing.Queue[LoggedDataPacket | Literal["STOP"]] = (
multiprocessing.Queue()
)
# self._log_queue: multiprocessing.Queue[LoggedDataPacket | Literal["STOP"]] = (
# multiprocessing.Queue()
# )
self._log_queue: Queue[LoggedDataPacket | Literal["STOP"]] = Queue()

# Start the logging process
self._log_process = multiprocessing.Process(
@@ -160,7 +162,8 @@ def log(
self._log_counter = 0 # Reset the counter for other states

# Put the message in the queue
self._log_queue.put(logged_data_packet)
# self._log_queue.put(logged_data_packet)
self._log_queue.put_many(logged_data_packets)

def _log_the_buffer(self):
"""
@@ -188,7 +191,8 @@ def _prepare_log_dict(
data packets.
:return: A deque of LoggedDataPacket objects.
"""
logged_data_packets: deque[LoggedDataPacket] = deque()
# logged_data_packets: deque[LoggedDataPacket] = deque()
logged_data_packets = []

# Convert the imu data packets to a dictionary:
for imu_data_packet in imu_data_packets:
36 changes: 32 additions & 4 deletions airbrakes/mock/mock_imu.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Module for simulating interacting with the IMU (Inertial measurement unit) on the rocket."""

import collections
import contextlib
import multiprocessing
import time
from pathlib import Path

import pandas as pd
from faster_fifo import Queue

from airbrakes.data_handling.imu_data_packet import (
EstimatedDataPacket,
@@ -14,6 +16,7 @@
)
from airbrakes.hardware.imu import IMU
from constants import LOG_BUFFER_SIZE, MAX_QUEUE_SIZE, SIMULATION_MAX_QUEUE_SIZE
from utils import convert_to_float


class MockIMU(IMU):
@@ -53,8 +56,8 @@ def __init__(
# test, because we read the file much faster than update(), sometimes resulting thousands
# of data packets in the queue, which will obviously mess up data processing calculations.
# We limit it to 15 packets, which is more realistic for a real flight.
self._data_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue(
MAX_QUEUE_SIZE if real_time_simulation else SIMULATION_MAX_QUEUE_SIZE
self._data_queue: Queue[IMUDataPacket] = Queue(
maxsize=MAX_QUEUE_SIZE if real_time_simulation else SIMULATION_MAX_QUEUE_SIZE
)

# Starts the process that fetches data from the log file
@@ -71,6 +74,27 @@ def __init__(
# Makes a boolean value that is shared between processes
self._running = multiprocessing.Value("b", False)

def get_imu_data_packets(self) -> collections.deque[IMUDataPacket]:
"""
Returns all available data packets from the IMU.
This method differs from the actual IMU in that it uses the faster-fifo library to get
multiple data packets, which is many times faster than retrieving them one by one.
:return: A deque containing the specified number of data packets
"""
# We use a deque because it's faster than a list for popping from the left
data_packets = collections.deque()
# While there is data in the queue, get the data packet and add it to the deque which we
# return
try:
dp = self._data_queue.get_many(
block=False, max_messages_to_get=SIMULATION_MAX_QUEUE_SIZE
)
except Exception:
dp = []
data_packets.extend(dp)

return data_packets

def _read_file(
self, log_file_path: Path, real_time_simulation: bool, start_after_log_buffer: bool = False
) -> None:
@@ -107,12 +131,16 @@ def _read_file(
)
# Read the csv, starting from the row after the log buffer, and using only the valid columns
df = pd.read_csv(
log_file_path, skiprows=range(1, start_index + 1), engine="c", usecols=valid_columns
log_file_path,
skiprows=range(1, start_index + 1),
engine="c",
usecols=valid_columns,
na_filter=False,
)
for row in df.itertuples(index=False):
start_time = time.time()
# Convert the named tuple to a dictionary and remove any NaN values:
row_dict = {k: v for k, v in row._asdict().items() if pd.notna(v)}
row_dict = {k: convert_to_float(v) for k, v in row._asdict().items() if v}

# Check if the process should stop:
if not self._running.value:
2 changes: 1 addition & 1 deletion constants.py
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ class ServoExtension(Enum):
# The maximum size of the data queue for the packets, so we don't run into memory issues
MAX_QUEUE_SIZE = 100000
# This is used for the mock imu to limit the queue size to a more realistic value
SIMULATION_MAX_QUEUE_SIZE = 15
SIMULATION_MAX_QUEUE_SIZE = 65

# -------------------------------------------------------
# Data Processing Configuration
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ dependencies = [
"psutil",
"scipy",
"pandas",
"faster-fifo"
# Installation instructions for the following dependencies can be found in the README:
# "mscl" https://github.com/LORD-MicroStrain/MSCL/blob/master/BuildScripts/buildReadme_Linux.md
]