Skip to content

Commit

Permalink
Added Keepalive_pings() method from WS7 for WS6
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsaxon committed Apr 29, 2019
1 parent 6b0be3b commit 006cefb
Showing 1 changed file with 229 additions and 6 deletions.
235 changes: 229 additions & 6 deletions pysonofflan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,97 @@
import time
from typing import Dict, Union, Callable, Awaitable
import asyncio
import enum

import websockets
from websockets.framing import OP_CLOSE, parse_close, OP_PING, OP_PONG

logger = logging.getLogger(__name__)

V6_DEFAULT_TIMEOUT = 10
V6_DEFAULT_PING_INTERVAL = 300

class InvalidState(Exception):
"""
Exception raised when an operation is forbidden in the current state.
"""

CLOSE_CODES = {
1000: "OK",
1001: "going away",
1002: "protocol error",
1003: "unsupported type",
# 1004 is reserved
1005: "no status code [internal]",
1006: "connection closed abnormally [internal]",
1007: "invalid data",
1008: "policy violation",
1009: "message too big",
1010: "extension required",
1011: "unexpected error",
1015: "TLS failure [internal]",
}

# A WebSocket connection goes through the following four states, in order:

class State(enum.IntEnum):
CONNECTING, OPEN, CLOSING, CLOSED = range(4)

class ConnectionClosed(InvalidState):
"""
Exception raised when trying to read or write on a closed connection.
Provides the connection close code and reason in its ``code`` and
``reason`` attributes respectively.
"""

def __init__(self, code, reason):
self.code = code
self.reason = reason
message = "WebSocket connection is closed: "
message += format_close(code, reason)
super().__init__(message)

def format_close(code, reason):
"""
Display a human-readable version of the close code and reason.
"""
if 3000 <= code < 4000:
explanation = "registered"
elif 4000 <= code < 5000:
explanation = "private use"
else:
explanation = CLOSE_CODES.get(code, "unknown")
result = "code = {} ({}), ".format(code, explanation)

if reason:
result += "reason = {}".format(reason)
else:
result += "no reason"

return result

class SonoffLANModeClientProtocol(websockets.WebSocketClientProtocol):
"""Customised WebSocket client protocol to ignore pong payload match."""

async def read_data_frame(self, max_size):
@asyncio.coroutine
def read_data_frame(self, max_size):
"""
Copied from websockets.WebSocketCommonProtocol to change pong handling
"""
logger = logging.getLogger(__name__)

while True:
frame = await self.read_frame(max_size)
frame = yield from self.read_frame(max_size)

if frame.opcode == OP_CLOSE:
self.close_code, self.close_reason = parse_close(frame.data)
await self.write_close_frame(frame.data)
yield from self.write_close_frame(frame.data)
return

elif frame.opcode == OP_PING:
ping_hex = binascii.hexlify(frame.data).decode() or '[empty]'
logger.debug(
"%s - received ping, sending pong: %s", self.side, ping_hex
)
await self.pong(frame.data)
yield from self.pong(frame.data)

elif frame.opcode == OP_PONG:
# Acknowledge pings on solicited pongs, regardless of payload
Expand All @@ -53,6 +116,166 @@ async def read_data_frame(self, max_size):
else:
return frame

def __init__(self, **kwds):

logger.debug("__init__()" )

if float(websockets.__version__) < 7.0:

self.ping_interval = V6_DEFAULT_PING_INTERVAL
self.ping_timeout = V6_DEFAULT_TIMEOUT

self.close_code: int
self.close_reason: str

# Task sending keepalive pings.
self.keepalive_ping_task = None

super().__init__(**kwds)

def connection_open(self):

logger.debug("connection_open()")

super().connection_open()

if float(websockets.__version__) < 7.0:

# Start the task that sends pings at regular intervals.
self.keepalive_ping_task = asyncio.ensure_future(
self.keepalive_ping(), loop=self.loop
)

@asyncio.coroutine
def keepalive_ping(self):

logger.debug("keepalive_ping()" )

if float(websockets.__version__) >= 7.0:

super().keepalive_ping()

else:

"""
Send a Ping frame and wait for a Pong frame at regular intervals.
This coroutine exits when the connection terminates and one of the
following happens:
- :meth:`ping` raises :exc:`ConnectionClosed`, or
- :meth:`close_connection` cancels :attr:`keepalive_ping_task`.
"""
if self.ping_interval is None:
return

try:
while True:

yield from asyncio.sleep(self.ping_interval, loop=self.loop)

# ping() cannot raise ConnectionClosed, only CancelledError:
# - If the connection is CLOSING, keepalive_ping_task will be
# canceled by close_connection() before ping() returns.
# - If the connection is CLOSED, keepalive_ping_task must be
# canceled already.

ping_waiter = yield from self.ping()

if self.ping_timeout is not None:
try:
yield from asyncio.wait_for(
ping_waiter, self.ping_timeout, loop=self.loop
)

except asyncio.TimeoutError:
logger.debug("%s ! timed out waiting for pong", self.side)
self.fail_connection(1011)
break

except asyncio.CancelledError:
raise

except Exception:
logger.warning("Unexpected exception in keepalive ping task", exc_info=True)

@asyncio.coroutine
def close_connection(self):

logger.debug("close_connection()")

yield from super().close_connection()

logger.debug("super.close_connection() finished" )

if float(websockets.__version__) < 7.0:

# Cancel the keepalive ping task.
if self.keepalive_ping_task is not None:
self.keepalive_ping_task.cancel()



def abort_keepalive_pings(self):

logger.debug("abort_keepalive_pings()")

if float(websockets.__version__) >= 7.0:
super().abort_keepalive_pings()

else:

"""
Raise ConnectionClosed in pending keepalive pings.
They'll never receive a pong once the connection is closed.
"""
assert self.state is State.CLOSED
exc = ConnectionClosed(self.close_code, self.close_reason)
exc.__cause__ = self.transfer_data_exc # emulate raise ... from ...

try:

for ping in self.pings.values():
ping.set_exception(exc)

except asyncio.InvalidStateError:
pass

""" No Need to do this as in V6, this is done in super.close_connection()
if self.pings:
pings_hex = ', '.join(
binascii.hexlify(ping_id).decode() or '[empty]'
for ping_id in self.pings
)
plural = 's' if len(self.pings) > 1 else ''
logger.debug(
"%s - aborted pending ping%s: %s", self.side, plural, pings_hex
)"""

def connection_lost(self, exc):

logger.debug("connection_lost()" )

if float(websockets.__version__) < 7.0:

logger.debug("%s - event = connection_lost(%s)", self.side, exc)
self.state = State.CLOSED
logger.debug("%s - state = CLOSED", self.side)
#if not hasattr(self, "close_code"):
self.close_code = 1006
#if not hasattr(self, "close_reason"):
self.close_reason = ""
logger.debug(
"%s x code = %d, reason = %s",
self.side,
self.close_code,
self.close_reason or "[no reason]",
)

self.abort_keepalive_pings()

super().connection_lost(exc)



class SonoffLANModeClient:
"""
Expand Down

0 comments on commit 006cefb

Please sign in to comment.