Skip to content

Commit

Permalink
More encapsulation for IMU
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil21 committed Sep 14, 2024
1 parent e67aba3 commit cc8797a
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions airbrakes/imu/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,34 @@ class IMU:
RAW_DESCRIPTOR_SET = 128

__slots__ = (
"data_fetch_process",
"data_queue",
"running",
"_data_fetch_process",
"_data_queue",
"_running",
)

def __init__(self, port: str, frequency: int, upside_down: bool):
# Shared Queue which contains the latest data from the IMU
self.data_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue()
self.running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes
self._data_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue()
self._running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes

# Starts the process that fetches data from the IMU
self.data_fetch_process = multiprocessing.Process(
self._data_fetch_process = multiprocessing.Process(
target=self._fetch_data_loop, args=(port, frequency, upside_down)
)

@property
def is_running(self) -> bool:
"""
Returns whether the process fetching data from the IMU is running.
"""
return self._running.value

def start(self):
"""
Starts the process fetching data from the IMU.
"""
self.running.value = True
self.data_fetch_process.start()
self._running.value = True
self._data_fetch_process.start()

def _fetch_data_loop(self, port: str, frequency: int, _: bool):
"""
Expand All @@ -54,7 +61,7 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool):
node = mscl.InertialNode(connection)
timeout = int(1000 / frequency)

while self.running.value:
while self._running.value:
# Get the latest data packets from the IMU, with the help of `getDataPackets`.
# `getDataPackets` accepts a timeout in milliseconds.
# During IMU configuration (outside of this code), we set the sampling rate of the IMU
Expand Down Expand Up @@ -103,7 +110,7 @@ def _fetch_data_loop(self, port: str, frequency: int, _: bool):
setattr(imu_data_packet, channel, data_point.as_float())

# Put the latest data into the shared queue
self.data_queue.put(imu_data_packet)
self._data_queue.put(imu_data_packet)

def get_imu_data_packet(self) -> IMUDataPacket | EstimatedDataPacket:
"""
Expand All @@ -115,7 +122,7 @@ def get_imu_data_packet(self) -> IMUDataPacket | EstimatedDataPacket:
:return: an IMUDataPacket object containing the latest data from the IMU. If a value is
not available, it will be None.
"""
return self.data_queue.get()
return self._data_queue.get()

def get_imu_data_packets(self) -> collections.deque[IMUDataPacket | EstimatedDataPacket]:
"""Returns all available data packets from the IMU.
Expand All @@ -125,7 +132,7 @@ def get_imu_data_packets(self) -> collections.deque[IMUDataPacket | EstimatedDat

data_packets = collections.deque()

while not self.data_queue.empty():
while not self._data_queue.empty():
data_packet = self.get_imu_data_packet()
data_packets.append(data_packet)

Expand All @@ -135,6 +142,6 @@ def stop(self):
"""
Stops the process fetching data from the IMU. This should be called when the program is shutting down.
"""
self.running.value = False
self._running.value = False
# Waits for the process to finish before stopping it
self.data_fetch_process.join()
self._data_fetch_process.join()

0 comments on commit cc8797a

Please sign in to comment.