From 80e49eb41f18ce354dc8286237da2b97f715fb44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Thu, 9 Nov 2023 05:53:08 +0000 Subject: [PATCH] Avoid hanging locks in modbus connection --- python/lvmecp/modbus.py | 64 +++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/python/lvmecp/modbus.py b/python/lvmecp/modbus.py index f27381c..5793764 100644 --- a/python/lvmecp/modbus.py +++ b/python/lvmecp/modbus.py @@ -234,21 +234,15 @@ def __init__(self, config: dict | pathlib.Path | str | None = None): async def connect(self): """Connects to the client.""" - if self.lock: - await self.lock.acquire() - hp = f"{self.host}:{self.port}" log.debug(f"Trying to connect to modbus server on {hp}") try: - await asyncio.wait_for(self.client.connect(), timeout=1) + await asyncio.wait_for(self.client.connect(), timeout=5) except asyncio.TimeoutError: raise ConnectionError(f"Timed out connecting to server at {hp}.") except Exception as err: raise ConnectionError(f"Failed connecting to server at {hp}: {err}.") - finally: - if self.lock and self.lock.locked() and not self.client.connected: - self.lock.release() log.debug(f"Connected to {hp}.") @@ -260,45 +254,53 @@ async def disconnect(self): log.debug(f"Disonnected from {self.host}:{self.port}.") - if self.lock and self.lock.locked(): - self.lock.release() - async def __aenter__(self): """Initialises the connection to the server.""" - await self.connect() + # Acquire the lock, but also don't allow it to block for too long. + try: + await asyncio.wait_for(self.lock.acquire(), 10) + except asyncio.TimeoutError: + log.warning("Timed out waiting for lock to be released. Forcing release.") + self.lock.release() + await self.lock.acquire() + + try: + await self.connect() + except Exception: + if self.lock.locked(): + self.lock.release() + + raise async def __aexit__(self, exc_type, exc, tb): """Closes the connection to the server.""" - await self.disconnect() + try: + await self.disconnect() + finally: + if self.lock.locked(): + self.lock.release() async def get_all(self): """Returns a dictionary with all the registers.""" names = results = [] - NRETRIES: int = 3 - for retry in range(NRETRIES): - async with self: - names = [name for name in self] - tasks = [elem.get(open_connection=False) for elem in self.values()] - - results = await asyncio.gather(*tasks, return_exceptions=True) - - if any([isinstance(result, Exception) for result in results]): - log.debug("Exceptions received while getting all registers.") + async with self: + names = [name for name in self] + tasks = [elem.get(open_connection=False) for elem in self.values()] - if retry < NRETRIES: - continue + results = await asyncio.gather(*tasks, return_exceptions=True) - for result in results: - if isinstance(result, Exception): - log.warning( - "Failed retrieving all registers. First exception:", - exc_info=result, - ) - break + if any([isinstance(result, Exception) for result in results]): + for result in results: + if isinstance(result, Exception): + log.warning( + "Failed retrieving all registers. First exception:", + exc_info=result, + ) + break return { names[ii]: results[ii]