diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 95a7c43..f59a8e5 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -172,10 +172,12 @@ class MQTT: This works with all callbacks but the "on_message" and those added via add_topic_callback(); for those, to get access to the user_data use the 'user_data' member of the MQTT object passed as 1st argument. + :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set + this to True in order to operate correctly over more than 24 days or so """ - # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member + # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals def __init__( self, *, @@ -193,6 +195,7 @@ def __init__( socket_timeout: int = 1, connect_retries: int = 5, user_data=None, + use_imprecise_time: Optional[bool] = None, ) -> None: self._socket_pool = socket_pool self._ssl_context = ssl_context @@ -200,6 +203,20 @@ def __init__( self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode + self.use_monotonic_ns = False + try: + time.monotonic_ns() + self.use_monotonic_ns = True + except AttributeError: + if use_imprecise_time: + self.use_monotonic_ns = False + else: + raise MMQTTException( # pylint: disable=raise-missing-from + "time.monotonic_ns() is not available. " + "Will use imprecise time however only if the" + "use_imprecise_time argument is set to True." + ) + if recv_timeout <= socket_timeout: raise MMQTTException( "recv_timeout must be strictly greater than socket_timeout" @@ -251,9 +268,8 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - self.client_id = ( - f"cpy{randint(0, int(time.monotonic() * 100) % 1000)}{randint(0, 99)}" - ) + time_int = int(self.get_monotonic_time() * 100) % 1000 + self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: raise ValueError("MQTT Client ID must be between 1 and 23 bytes") @@ -276,6 +292,17 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None + def get_monotonic_time(self) -> float: + """ + Provide monotonic time in seconds. Based on underlying implementation + this might result in imprecise time, that will result in the library + not being able to operate if running contiguously for more than 24 days or so. + """ + if self.use_monotonic_ns: + return time.monotonic_ns() / 1000000000 + + return time.monotonic() + # pylint: disable=too-many-branches def _get_connect_socket(self, host: str, port: int, *, timeout: int = 1): """Obtains a new socket and connects to a broker. @@ -636,7 +663,7 @@ def _connect( self._send_str(self._username) self._send_str(self._password) self.logger.debug("Receiving CONNACK packet from broker") - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 32: @@ -652,7 +679,7 @@ def _connect( return result if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -681,13 +708,13 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp = time.monotonic() + stamp = self.get_monotonic_time() rc, rcs = None, [] while rc != MQTT_PINGRESP: rc = self._wait_for_msg() if rc: rcs.append(rc) - if time.monotonic() - stamp > ping_timeout: + if self.get_monotonic_time() - stamp > ping_timeout: raise MMQTTException("PINGRESP not returned from broker.") return rcs @@ -768,7 +795,7 @@ def publish( if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 0x40: @@ -782,7 +809,7 @@ def publish( return if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -834,11 +861,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None: for t, q in topics: self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) self._sock.send(packet) - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -901,10 +928,10 @@ def unsubscribe(self, topic: str) -> None: self._sock.send(packet) self.logger.debug("Waiting for UNSUBACK...") while True: - stamp = time.monotonic() + stamp = self.get_monotonic_time() op = self._wait_for_msg() if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -998,8 +1025,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") if self._timestamp == 0: - self._timestamp = time.monotonic() - current_time = time.monotonic() + self._timestamp = self.get_monotonic_time() + current_time = self.get_monotonic_time() if current_time - self._timestamp >= self.keep_alive: self._timestamp = 0 # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server @@ -1009,14 +1036,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs = self.ping() return rcs - stamp = time.monotonic() + stamp = self.get_monotonic_time() rcs = [] while True: rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if time.monotonic() - stamp > timeout: + if self.get_monotonic_time() - stamp > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -1115,7 +1142,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: :param int bufsize: number of bytes to receive :return: byte array """ - stamp = time.monotonic() + stamp = self.get_monotonic_time() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) @@ -1130,7 +1157,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if time.monotonic() - stamp > read_timeout: + if self.get_monotonic_time() - stamp > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1150,7 +1177,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if time.monotonic() - stamp > read_timeout: + if self.get_monotonic_time() - stamp > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." )