Skip to content

Commit

Permalink
Fix logging bug and better encapsulation for Logger class
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil21 committed Sep 14, 2024
1 parent 22254aa commit e67aba3
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 26 deletions.
11 changes: 9 additions & 2 deletions airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def __init__(self, logger: Logger, servo: Servo, imu: IMU):
# Placeholder for the current airbrake extension until they are set
self.current_extension: float = 0.0

def start(self) -> None:
"""
Starts the IMU and logger processes. This is called before the main while loop starts.
"""
self.imu.start()
self.logger.start()

def update(self) -> None:
"""
Called every loop iteration from the main process. Depending on the current state, it will
Expand All @@ -65,7 +72,7 @@ def update(self) -> None:
)
# Logs the current state, extension, and IMU data
# TODO: Compute state(s) for given IMU data
self.logger.log(self.state.get_name(), self.current_extension, data_packets.copy())
self.logger.log(self.state.get_name(), self.current_extension, data_packets)

self.state.update()

Expand All @@ -76,7 +83,7 @@ def set_airbrake_extension(self, extension: float) -> None:
"""
self.servo.set_extension(extension)

def shutdown(self) -> None:
def stop(self) -> None:
"""
Handles shutting down the airbrakes. This will cause the main loop to break.
"""
Expand Down
8 changes: 7 additions & 1 deletion airbrakes/imu/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ class IMU:
def __init__(self, port: str, frequency: int, upside_down: bool):
# Shared Queue which contains the latest data from the IMU
self.data_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue()
self.running = multiprocessing.Value("b", True) # Makes a boolean value that is shared between processes
self.running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes

# Starts the process that fetches data from the IMU
self.data_fetch_process = multiprocessing.Process(
target=self._fetch_data_loop, args=(port, frequency, upside_down)
)

def start(self):
"""
Starts the process fetching data from the IMU.
"""
self.running.value = True
self.data_fetch_process.start()

def _fetch_data_loop(self, port: str, frequency: int, _: bool):
Expand Down
61 changes: 40 additions & 21 deletions airbrakes/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ class Logger:
It uses the Python logging module to append the airbrake's current state, extension, and IMU data to our logs in
real time.
Args:
log_dir (:class:`pathlib.Path`): The directory where the log files will be.
"""

__slots__ = ("log_path", "log_process", "log_queue", "running")
__slots__ = ("_log_process", "_log_queue", "_running", "_stop_signal", "log_path")

def __init__(self):
log_dir = Path("logs")
def __init__(self, log_dir: Path):
log_dir.mkdir(parents=True, exist_ok=True)

# Get all existing log files and find the highest suffix number
Expand All @@ -35,14 +37,32 @@ def __init__(self):
writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS)
writer.writeheader()

# Makes a queue to store log messages, basically it's a process-safe list that you add to the back and pop from
# front, meaning that things will be logged in the order they were added
self.log_queue = multiprocessing.Queue()
self.running = multiprocessing.Value("b", True) # Makes a boolean value that is shared between processes
# Makes a queue to store log messages, basically it's a process-safe list that you add to
# the back and pop from front, meaning that things will be logged in the order they were
# added
self._log_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue()
self._running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes

# Start the logging process
self.log_process = multiprocessing.Process(target=self._logging_loop)
self.log_process.start()
self._log_process = multiprocessing.Process(target=self._logging_loop)

# The signal to stop the logging process, this will be put in the queue to stop the process
# see stop() and _logging_loop() for more details.
self._stop_signal = "STOP"

def start(self):
"""
Starts the logging process. This is called before the main while loop starts.
"""
self._running.value = True
self._log_process.start()

@property
def is_running(self) -> bool:
"""
Returns whether the logging process is running.
"""
return self._running.value

def _logging_loop(self):
"""
Expand All @@ -51,13 +71,12 @@ def _logging_loop(self):
# Set up the csv logging in the new process
with self.log_path.open(mode="a", newline="") as file_writer:
writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS)
while self.running.value:
while self._running.value:
# Get a message from the queue (this will block until a message is available)
# Because there's no timeout, it will wait indefinitely until it gets a message
# -- this is fine in practice, as >100 messages a second should be added to the
# queue, but if for some reason the queue is empty, it will block forever and
# stop() won't work
message_fields = self.log_queue.get()
# Because there's no timeout, it will wait indefinitely until it gets a message.
message_fields = self._log_queue.get()
if message_fields == self._stop_signal:
break
writer.writerow(message_fields)

def log(self, state: str, extension: float, imu_data_list: collections.deque[IMUDataPacket]):
Expand All @@ -70,16 +89,16 @@ def log(self, state: str, extension: float, imu_data_list: collections.deque[IMU
# Loop through all the IMU data packets
for imu_data in imu_data_list:
# Formats the log message as a CSV line
message_dict = {"state": state, "extension": extension, "timestamp": imu_data.timestamp}.update(
{key: getattr(imu_data, key) for key in imu_data.__slots__}
)
message_dict = {"state": state, "extension": extension, "timestamp": imu_data.timestamp}
message_dict.update({key: getattr(imu_data, key) for key in imu_data.__slots__})
# Put the message in the queue
self.log_queue.put(message_dict)
self._log_queue.put(message_dict)

def stop(self):
"""
Stops the logging process. It will finish logging the current message and then stop.
"""
self.running.value = False
self._running.value = False
# Waits for the process to finish before stopping it
self.log_process.join()
self._log_queue.put(self._stop_signal) # Put the stop signal in the queue
self._log_process.join()
12 changes: 10 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""The main file which will be run on the Raspberry Pi. It will create the AirbrakesContext object and run the main
loop."""

from pathlib import Path

from airbrakes.airbrakes import AirbrakesContext
from airbrakes.constants import FREQUENCY, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN
from airbrakes.imu.imu import IMU
Expand All @@ -9,17 +11,23 @@


def main():
logger = Logger()
logger = Logger(Path("logs"))
servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION)
imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN)

# The context that will manage the airbrakes state machine
airbrakes = AirbrakesContext(logger, servo, imu)

# This is the main loop that will run until the shutdown method on the airbrakes is called
# Start the IMU and logger processes:
airbrakes.start()

# This is the main loop that will run until the stop method on the airbrakes is called
while not airbrakes.shutdown_requested:
airbrakes.update()

# Shutdown the IMU and logger processes:
airbrakes.stop()


if __name__ == "__main__":
main()

0 comments on commit e67aba3

Please sign in to comment.