From 270cef93b949359e16bed19168c7de36fbab4f5c Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Fri, 10 Nov 2023 11:27:02 +0100 Subject: [PATCH] Fix filters lock Filters were recently protected to prevent exception when iterating the list. But it was wrongly done, preventing the usage of multiple threads to handle request. --- .../wirepas_network_interface.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/wirepas_mqtt_library/wirepas_network_interface.py b/wirepas_mqtt_library/wirepas_network_interface.py index 7b0d0b2..847b7cd 100644 --- a/wirepas_mqtt_library/wirepas_network_interface.py +++ b/wirepas_mqtt_library/wirepas_network_interface.py @@ -346,8 +346,10 @@ def _on_data_received(self, client, userdata, message): def _dispatch_uplink_data(self, data): with self._data_uplink_filters_lock: - for f in self._data_uplink_filters.values(): - f.filter_and_dispatch(data) + filters_copy = list(self._data_uplink_filters.values()) + + for f in filters_copy: + f.filter_and_dispatch(data) def _publish(self, topic, payload, qos=1, retain=False): try: @@ -364,7 +366,7 @@ def _call_cb(self, response, *args): # Add caller param after response error code and add any additional param at the end cb(response.res, param, *args) # Cb called, remove key - + del self._ongoing_requests[response.req_id] except KeyError: # No cb set, just pass (could be timeout that expired) @@ -382,14 +384,16 @@ def _on_downlink_data_received(self, client, userdata, message): logging.debug("Received message with id: %s", data.req_id) self._wait_for_response(self._dispatch_downlink_data, data.req_id, param=data) - + except wmm.GatewayAPIParsingException as e: logging.error(str(e)) def _dispatch_downlink_data(self, response, data): with self._data_downlink_filters_lock: - for f in self._data_downlink_filters.values(): - f.filter_and_dispatch(data, response) + filters_copy = list(self._data_downlink_filters.values()) + + for f in filters_copy: + f.filter_and_dispatch(data, response) def _on_response_received(self, client, userdata, message): # Topic are as followed: gw-response/cmd/... @@ -520,16 +524,16 @@ def wrapper(*args, **kwargs): def close(self): """Explicitly close this network interface as well as the worker threads - This closes disconnects the MQTT client from the broker which automatically closes the network thread. + This closes disconnects the MQTT client from the broker which automatically closes the network thread. In addition, it creates empty tasks that force the worker daemon threads to exit cleanly. Given the connection to broker is closed first, callbacks are not executed anymore and no additional tasks are therefore added in the queue. Thus by design, no valid task can be added to the queue whilst the worker threads are being closed. - If you do not require to close the connection in a deterministic way by invoking this method, the resources on the - host and broker are anyway cleaned up by way of garbage collection once the main program utilising this class is + If you do not require to close the connection in a deterministic way by invoking this method, the resources on the + host and broker are anyway cleaned up by way of garbage collection once the main program utilising this class is stopped and the corresponding python process is exited. - + .. warning:: the instance of WNI must not be used anymore after this call. Any other method call requiring the broker connection after this one will end up in TimeoutError exception """ @@ -644,7 +648,7 @@ def clear_gateway_status(self, gw_id): self._publish(topic, None, qos=1, retain=True) def _add_to_ongoing_request(self, req_id, cb, param=None): - try: + try: self._ongoing_requests[req_id].append((cb, param)) except KeyError: self._ongoing_requests[req_id]= [(cb, param)] @@ -1128,10 +1132,10 @@ def add_task(self, task, *args, **kwargs): def worker(self): while True: - try: + try: task, args, kwargs = self.get() task(*args, **kwargs) - except TypeError as e: + except TypeError as e: # When a task is None in the queue and the task is invoked # a type error is raised. This condition is used to terminate the Thread break