Skip to content

Commit

Permalink
Updated to fix minimum time between cmd sends
Browse files Browse the repository at this point in the history
  • Loading branch information
corporategoth committed Feb 12, 2023
1 parent 51635a1 commit cb5b729
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 29 deletions.
1 change: 0 additions & 1 deletion custom_components/powerpetdoor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CONF_UPDATE,
CONF_RECONNECT,
CONFIG,
CMD_GET_DOOR_STATUS,
)

PLATFORMS = [ Platform.SENSOR, Platform.SWITCH ]
Expand Down
72 changes: 45 additions & 27 deletions custom_components/powerpetdoor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FIELD_BATTERY_PERCENT,
FIELD_BATTERY_PRESENT,
FIELD_AC_PRESENT,
MINIMUM_TIME_BETWEEN_MSGS,
)

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,6 +111,8 @@ class PowerPetDoorClient:
_check_receipt = None
_last_ping = None
_last_command = None
_can_dequeue = False
_last_send = 0
_failed_msg = 0
_failed_pings = 0
_buffer = ''
Expand Down Expand Up @@ -139,12 +142,6 @@ def ensure_future(self, *args: Any, **kwargs: Any):
def run_coroutine_threadsafe(self, *args: Any, **kwargs: Any):
return asyncio.run_coroutine_threadsafe(*args, loop=self._eventLoop, **kwargs)

async def sleep(self, *args: Any, **kwargs: Any):
return await asyncio.sleep(*args, loop=self._eventLoop, **kwargs)

async def wait_for(self, *args: Any, **kwargs: Any):
return await asyncio.wait_for(*args, loop=self._eventLoop, **kwargs)

def add_listener(self, name: str,
door_status_update: Callable[[str], None] | None = None,
settings_update: Callable[[dict], None] | None = None,
Expand Down Expand Up @@ -219,24 +216,25 @@ def connection_made(self, transport) -> None:
"""asyncio callback for a successful connection."""
_LOGGER.info("Connection Successful!")
self._transport = transport
self._can_dequeue = True

if self.cfg_keepalive:
self._keepalive = self.ensure_future(self.keepalive())

self.dequeue_data();

# Caller code
if self.on_connect:
self.on_connect()

def connection_lost(self, exc) -> None:
"""asyncio callback for connection lost."""
self._can_dequeue = False
if not self._shutdown:
_LOGGER.error('The server closed the connection. Reconnecting...')
self.ensure_future(self.reconnect(self.cfg_reconnect))

async def reconnect(self, delay) -> None:
"""Internal method for reconnecting."""
await self.sleep(delay)
await asyncio.sleep(delay)
await self.connect()

def disconnect(self) -> None:
Expand All @@ -256,6 +254,8 @@ def disconnect(self) -> None:
self._outstanding = {}
self._last_ping = None
self._last_command = None
self._can_dequeue = False
self._last_send = 0
self._failed_msg = 0
self._failed_pings = 0
self._buffer = ''
Expand All @@ -273,7 +273,7 @@ def handle_connect_failure(self) -> None:

async def keepalive(self) -> None:
_keepalive = self._keepalive
await self.sleep(self.cfg_keepalive)
await asyncio.sleep(self.cfg_keepalive)
if _keepalive and not _keepalive.cancelled():
if self._last_ping is not None:
self._failed_pings += 1
Expand All @@ -290,7 +290,7 @@ async def keepalive(self) -> None:

async def check_receipt(self, rawdata) -> None:
_check_receipt = self._check_receipt
await self.sleep(self.cfg_timeout)
await asyncio.sleep(self.cfg_timeout)
if _check_receipt and not _check_receipt.cancelled():
self._failed_msg += 1
if self._failed_msg < MAX_FAILED_MSG:
Expand All @@ -303,43 +303,58 @@ async def check_receipt(self, rawdata) -> None:

self._check_receipt = None
if self._failed_msg == 0:
self.dequeue_data()
await self.dequeue_data()
else:
self._send_data(rawdata)
await self._send_data(rawdata)

def enqueue_data(self, data) -> None:
self._queue.put(data)
if self._transport and not self._check_receipt:
self.dequeue_data();
if self._transport and self._can_dequeue:
self._can_dequeue = False
self.ensure_future(self.dequeue_data());

def _send_data(self, rawdata) -> None:
async def _send_data(self, rawdata) -> None:
if not self._transport:
_LOGGER.warning('Attempted to write to the stream without a connection active')
return

if self._keepalive:
self._keepalive.cancel()
self._keepalive = None
try:
_LOGGER.debug(str.format('TX > {0}', rawdata))
diff = time.time() - self._last_send
if diff < MINIMUM_TIME_BETWEEN_MSGS:
await asyncio.sleep(MINIMUM_TIME_BETWEEN_MSGS - diff)
_LOGGER.debug(str.format('TX > {0}', rawdata.decode('ascii')))
self._transport.write(rawdata)
if self._last_command:
self._check_receipt = self.ensure_future(self.check_receipt(rawdata))
self._last_send = time.time()

if self.cfg_keepalive:
self._keepalive = self.ensure_future(self.keepalive())

if self._last_command:
self._check_receipt = self.ensure_future(self.check_receipt(rawdata))
else:
await self.dequeue_data()

except RuntimeError as err:
_LOGGER.error(str.format('Failed to write to the stream. ({0}) ', err))
self.disconnect()

def dequeue_data(self) -> None:
"""Raw data send- just make sure it's encoded properly and logged."""
if self._queue.empty():
async def dequeue_data(self) -> None:
if not self._transport:
_LOGGER.warning('Attempted to write to the stream without a connection active')
return

if self._check_receipt:
_LOGGER.warning('Attempted to send data while another message is still outstanding')
return
if not self._transport:
_LOGGER.warning('Attempted to write to the stream without a connection active')

"""Raw data send- just make sure it's encoded properly and logged."""
if self._queue.empty():
self._can_dequeue = True
return

try:
data = self._queue.get_nowait()
if COMMAND in data:
Expand All @@ -354,10 +369,10 @@ def dequeue_data(self) -> None:

self._failed_msg = 0
rawdata = json.dumps(data).encode("ascii")
self._send_data(rawdata)
await self._send_data(rawdata)

except queue.Empty as err:
_LOGGER.warning('Attempted to dequeue from an empty queue')
return

def data_received(self, rawdata) -> None:
"""asyncio callback for any data recieved from the power pet door."""
Expand Down Expand Up @@ -396,7 +411,10 @@ async def process_message(self, msg) -> None:
if self._check_receipt:
self._check_receipt.cancel()
self._check_receipt = None
self.dequeue_data()
await self.dequeue_data()
elif self._can_dequeue:
self._can_dequeue = False
await self.dequeue_data()

if msg[FIELD_SUCCESS] == "true":
if msg["CMD"] in (CMD_GET_DOOR_STATUS, DOOR_STATUS):
Expand Down
1 change: 1 addition & 0 deletions custom_components/powerpetdoor/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
DEFAULT_KEEP_ALIVE_TIMEOUT = 30.0
DEFAULT_REFRESH_TIMEOUT = 300.0
DEFAULT_HOLD = True
MINIMUM_TIME_BETWEEN_MSGS = 0.250

COMMAND = "cmd"
CONFIG = "config"
Expand Down
2 changes: 1 addition & 1 deletion custom_components/powerpetdoor/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
"config_flow": true,
"quality_scale": "gold",
"iot_class": "local_push",
"version": "0.2.5"
"version": "0.2.6"
}

0 comments on commit cb5b729

Please sign in to comment.