Skip to content

Commit

Permalink
Fix filters lock
Browse files Browse the repository at this point in the history
Filters were recently protected to prevent exception when iterating the list.

But it was wrongly done, preventing the usage of multiple threads to handle request.
  • Loading branch information
GwendalRaoul committed Nov 10, 2023
1 parent 26d2fbc commit 270cef9
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions wirepas_mqtt_library/wirepas_network_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,10 @@ def _on_data_received(self, client, userdata, message):

def _dispatch_uplink_data(self, data):
with self._data_uplink_filters_lock:
for f in self._data_uplink_filters.values():
f.filter_and_dispatch(data)
filters_copy = list(self._data_uplink_filters.values())

for f in filters_copy:
f.filter_and_dispatch(data)

def _publish(self, topic, payload, qos=1, retain=False):
try:
Expand All @@ -364,7 +366,7 @@ def _call_cb(self, response, *args):
# Add caller param after response error code and add any additional param at the end
cb(response.res, param, *args)
# Cb called, remove key

del self._ongoing_requests[response.req_id]
except KeyError:
# No cb set, just pass (could be timeout that expired)
Expand All @@ -382,14 +384,16 @@ def _on_downlink_data_received(self, client, userdata, message):

logging.debug("Received message with id: %s", data.req_id)
self._wait_for_response(self._dispatch_downlink_data, data.req_id, param=data)

except wmm.GatewayAPIParsingException as e:
logging.error(str(e))

def _dispatch_downlink_data(self, response, data):
with self._data_downlink_filters_lock:
for f in self._data_downlink_filters.values():
f.filter_and_dispatch(data, response)
filters_copy = list(self._data_downlink_filters.values())

for f in filters_copy:
f.filter_and_dispatch(data, response)

def _on_response_received(self, client, userdata, message):
# Topic are as followed: gw-response/cmd/...
Expand Down Expand Up @@ -520,16 +524,16 @@ def wrapper(*args, **kwargs):
def close(self):
"""Explicitly close this network interface as well as the worker threads
This closes disconnects the MQTT client from the broker which automatically closes the network thread.
This closes disconnects the MQTT client from the broker which automatically closes the network thread.
In addition, it creates empty tasks that force the worker daemon threads to exit cleanly.
Given the connection to broker is closed first, callbacks are not executed anymore and no additional tasks
are therefore added in the queue. Thus by design, no valid task can be added to the queue whilst the worker threads
are being closed.
If you do not require to close the connection in a deterministic way by invoking this method, the resources on the
host and broker are anyway cleaned up by way of garbage collection once the main program utilising this class is
If you do not require to close the connection in a deterministic way by invoking this method, the resources on the
host and broker are anyway cleaned up by way of garbage collection once the main program utilising this class is
stopped and the corresponding python process is exited.
.. warning:: the instance of WNI must not be used anymore after this call. Any other method call requiring the broker
connection after this one will end up in TimeoutError exception
"""
Expand Down Expand Up @@ -644,7 +648,7 @@ def clear_gateway_status(self, gw_id):
self._publish(topic, None, qos=1, retain=True)

def _add_to_ongoing_request(self, req_id, cb, param=None):
try:
try:
self._ongoing_requests[req_id].append((cb, param))
except KeyError:
self._ongoing_requests[req_id]= [(cb, param)]
Expand Down Expand Up @@ -1128,10 +1132,10 @@ def add_task(self, task, *args, **kwargs):

def worker(self):
while True:
try:
try:
task, args, kwargs = self.get()
task(*args, **kwargs)
except TypeError as e:
except TypeError as e:
# When a task is None in the queue and the task is invoked
# a type error is raised. This condition is used to terminate the Thread
break
Expand Down

0 comments on commit 270cef9

Please sign in to comment.