From 19da16eaca8b4c62fe9e1d36ced73a454a15a037 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Thu, 12 Dec 2024 10:01:33 -0500 Subject: [PATCH] feat: add some cleanup methods to the MessageBus Signed-off-by: John DeAngelis --- backend/ops_api/ops/__init__.py | 1 + backend/ops_api/ops/services/can_messages.py | 2 +- backend/ops_api/ops/services/message_bus.py | 12 ++++++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/backend/ops_api/ops/__init__.py b/backend/ops_api/ops/__init__.py index d1ee0f5631..0f1746a632 100644 --- a/backend/ops_api/ops/__init__.py +++ b/backend/ops_api/ops/__init__.py @@ -95,6 +95,7 @@ def shutdown_session(exception=None): def teardown_request(exception=None): if hasattr(request, "message_bus"): request.message_bus.handle() + request.message_bus.cleanup() @event.listens_for(db_session, "before_commit") def receive_before_commit(session: Session): diff --git a/backend/ops_api/ops/services/can_messages.py b/backend/ops_api/ops/services/can_messages.py index 805983cae2..bb6e324ca2 100644 --- a/backend/ops_api/ops/services/can_messages.py +++ b/backend/ops_api/ops/services/can_messages.py @@ -8,5 +8,5 @@ def can_history_trigger( event: OpsEvent, session: Session, ): - logger.debug(f"Handling event {event}") + logger.debug(f"Handling event {event.event_type} with details: {event.event_details}") assert session is not None diff --git a/backend/ops_api/ops/services/message_bus.py b/backend/ops_api/ops/services/message_bus.py index d580a89857..edc61267aa 100644 --- a/backend/ops_api/ops/services/message_bus.py +++ b/backend/ops_api/ops/services/message_bus.py @@ -19,6 +19,7 @@ class MessageBus: """ published_events: List[OpsEvent] = [] + known_callbacks = [] def handle(self): """ @@ -28,6 +29,7 @@ def handle(self): ops_signal = signal(event.event_type.name) ops_signal.send(event, session=current_app.db_session) logger.debug(f"Handling event {event}") + self.published_events.clear() def subscribe(self, event_type: OpsEventType, callback: callable): """ @@ -51,3 +53,13 @@ def publish(self, event_type: OpsEventType, event: OpsEvent): """ logger.debug(f"Publishing event {event_type} with details {event}") self.published_events.append(event) + + def cleanup(self): + """ + Clean up all subscriptions and published events. + """ + for callback in self.known_callbacks: + ops_signal = signal(callback) + ops_signal.disconnect(callback) + self.published_events.clear() + self.known_callbacks.clear()