From e67aba3cfd6d50cb4b8c5f122aeca7f921f695c0 Mon Sep 17 00:00:00 2001 From: Harshil <37377066+harshil21@users.noreply.github.com> Date: Sat, 14 Sep 2024 19:25:12 -0400 Subject: [PATCH] Fix logging bug and better encapsulation for Logger class --- airbrakes/airbrakes.py | 11 ++++++-- airbrakes/imu/imu.py | 8 +++++- airbrakes/logger.py | 61 +++++++++++++++++++++++++++--------------- main.py | 12 +++++++-- 4 files changed, 66 insertions(+), 26 deletions(-) diff --git a/airbrakes/airbrakes.py b/airbrakes/airbrakes.py index 043b989b..e5ca6477 100644 --- a/airbrakes/airbrakes.py +++ b/airbrakes/airbrakes.py @@ -44,6 +44,13 @@ def __init__(self, logger: Logger, servo: Servo, imu: IMU): # Placeholder for the current airbrake extension until they are set self.current_extension: float = 0.0 + def start(self) -> None: + """ + Starts the IMU and logger processes. This is called before the main while loop starts. + """ + self.imu.start() + self.logger.start() + def update(self) -> None: """ Called every loop iteration from the main process. Depending on the current state, it will @@ -65,7 +72,7 @@ def update(self) -> None: ) # Logs the current state, extension, and IMU data # TODO: Compute state(s) for given IMU data - self.logger.log(self.state.get_name(), self.current_extension, data_packets.copy()) + self.logger.log(self.state.get_name(), self.current_extension, data_packets) self.state.update() @@ -76,7 +83,7 @@ def set_airbrake_extension(self, extension: float) -> None: """ self.servo.set_extension(extension) - def shutdown(self) -> None: + def stop(self) -> None: """ Handles shutting down the airbrakes. This will cause the main loop to break. """ diff --git a/airbrakes/imu/imu.py b/airbrakes/imu/imu.py index e13d1ed3..9337b03a 100644 --- a/airbrakes/imu/imu.py +++ b/airbrakes/imu/imu.py @@ -31,12 +31,18 @@ class IMU: def __init__(self, port: str, frequency: int, upside_down: bool): # Shared Queue which contains the latest data from the IMU self.data_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue() - self.running = multiprocessing.Value("b", True) # Makes a boolean value that is shared between processes + self.running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes # Starts the process that fetches data from the IMU self.data_fetch_process = multiprocessing.Process( target=self._fetch_data_loop, args=(port, frequency, upside_down) ) + + def start(self): + """ + Starts the process fetching data from the IMU. + """ + self.running.value = True self.data_fetch_process.start() def _fetch_data_loop(self, port: str, frequency: int, _: bool): diff --git a/airbrakes/logger.py b/airbrakes/logger.py index 80a6e98c..67fe7eec 100644 --- a/airbrakes/logger.py +++ b/airbrakes/logger.py @@ -17,12 +17,14 @@ class Logger: It uses the Python logging module to append the airbrake's current state, extension, and IMU data to our logs in real time. + + Args: + log_dir (:class:`pathlib.Path`): The directory where the log files will be. """ - __slots__ = ("log_path", "log_process", "log_queue", "running") + __slots__ = ("_log_process", "_log_queue", "_running", "_stop_signal", "log_path") - def __init__(self): - log_dir = Path("logs") + def __init__(self, log_dir: Path): log_dir.mkdir(parents=True, exist_ok=True) # Get all existing log files and find the highest suffix number @@ -35,14 +37,32 @@ def __init__(self): writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS) writer.writeheader() - # Makes a queue to store log messages, basically it's a process-safe list that you add to the back and pop from - # front, meaning that things will be logged in the order they were added - self.log_queue = multiprocessing.Queue() - self.running = multiprocessing.Value("b", True) # Makes a boolean value that is shared between processes + # Makes a queue to store log messages, basically it's a process-safe list that you add to + # the back and pop from front, meaning that things will be logged in the order they were + # added + self._log_queue: multiprocessing.Queue[IMUDataPacket] = multiprocessing.Queue() + self._running = multiprocessing.Value("b", False) # Makes a boolean value that is shared between processes # Start the logging process - self.log_process = multiprocessing.Process(target=self._logging_loop) - self.log_process.start() + self._log_process = multiprocessing.Process(target=self._logging_loop) + + # The signal to stop the logging process, this will be put in the queue to stop the process + # see stop() and _logging_loop() for more details. + self._stop_signal = "STOP" + + def start(self): + """ + Starts the logging process. This is called before the main while loop starts. + """ + self._running.value = True + self._log_process.start() + + @property + def is_running(self) -> bool: + """ + Returns whether the logging process is running. + """ + return self._running.value def _logging_loop(self): """ @@ -51,13 +71,12 @@ def _logging_loop(self): # Set up the csv logging in the new process with self.log_path.open(mode="a", newline="") as file_writer: writer = csv.DictWriter(file_writer, fieldnames=CSV_HEADERS) - while self.running.value: + while self._running.value: # Get a message from the queue (this will block until a message is available) - # Because there's no timeout, it will wait indefinitely until it gets a message - # -- this is fine in practice, as >100 messages a second should be added to the - # queue, but if for some reason the queue is empty, it will block forever and - # stop() won't work - message_fields = self.log_queue.get() + # Because there's no timeout, it will wait indefinitely until it gets a message. + message_fields = self._log_queue.get() + if message_fields == self._stop_signal: + break writer.writerow(message_fields) def log(self, state: str, extension: float, imu_data_list: collections.deque[IMUDataPacket]): @@ -70,16 +89,16 @@ def log(self, state: str, extension: float, imu_data_list: collections.deque[IMU # Loop through all the IMU data packets for imu_data in imu_data_list: # Formats the log message as a CSV line - message_dict = {"state": state, "extension": extension, "timestamp": imu_data.timestamp}.update( - {key: getattr(imu_data, key) for key in imu_data.__slots__} - ) + message_dict = {"state": state, "extension": extension, "timestamp": imu_data.timestamp} + message_dict.update({key: getattr(imu_data, key) for key in imu_data.__slots__}) # Put the message in the queue - self.log_queue.put(message_dict) + self._log_queue.put(message_dict) def stop(self): """ Stops the logging process. It will finish logging the current message and then stop. """ - self.running.value = False + self._running.value = False # Waits for the process to finish before stopping it - self.log_process.join() + self._log_queue.put(self._stop_signal) # Put the stop signal in the queue + self._log_process.join() diff --git a/main.py b/main.py index f5f95acc..587bc999 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,8 @@ """The main file which will be run on the Raspberry Pi. It will create the AirbrakesContext object and run the main loop.""" +from pathlib import Path + from airbrakes.airbrakes import AirbrakesContext from airbrakes.constants import FREQUENCY, MAX_EXTENSION, MIN_EXTENSION, PORT, SERVO_PIN, UPSIDE_DOWN from airbrakes.imu.imu import IMU @@ -9,17 +11,23 @@ def main(): - logger = Logger() + logger = Logger(Path("logs")) servo = Servo(SERVO_PIN, MIN_EXTENSION, MAX_EXTENSION) imu = IMU(PORT, FREQUENCY, UPSIDE_DOWN) # The context that will manage the airbrakes state machine airbrakes = AirbrakesContext(logger, servo, imu) - # This is the main loop that will run until the shutdown method on the airbrakes is called + # Start the IMU and logger processes: + airbrakes.start() + + # This is the main loop that will run until the stop method on the airbrakes is called while not airbrakes.shutdown_requested: airbrakes.update() + # Shutdown the IMU and logger processes: + airbrakes.stop() + if __name__ == "__main__": main()