Skip to content

Commit

Permalink
Merge branch 'master' into pyup-update-pipupgrade-1.4.0-to-1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
beveradb authored May 16, 2019
2 parents 9a845ed + 8c5988e commit 9aafc07
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 125 deletions.
39 changes: 23 additions & 16 deletions pysonofflan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def device_id_callback(device: SonoffSwitch):
if device.basic_info is not None:
device.shared_state['device_id_at_current_ip'] = \
device.device_id
device.keep_running = False
device.shutdown_event_loop()

SonoffSwitch(
host=ip,
Expand All @@ -143,9 +143,10 @@ def state(config: dict):

async def state_callback(device):
if device.basic_info is not None:
print_device_details(device)
if device.available:
print_device_details(device)

device.shutdown_event_loop()
device.shutdown_event_loop()

logger.info("Initialising SonoffSwitch with host %s" % config['host'])
SonoffSwitch(
Expand Down Expand Up @@ -215,21 +216,27 @@ def switch_device(host, inching, new_state):

async def update_callback(device: SonoffSwitch):
if device.basic_info is not None:
if inching is None:
logger.info("\nInitial state:")
print_device_details(device)

device.client.keep_running = False
if new_state == "on":
await device.turn_on()
else:
await device.turn_off()
else:
logger.info("Inching device activated by switching ON for "
"%ss" % inching)
if device.available:

logger.info("\nNew state:")
print_device_details(device)
if inching is None:
print_device_details(device)

if device.is_on:
if new_state == "on":
device.shutdown_event_loop()
else:
await device.turn_off()

elif device.is_off:
if new_state == "off":
device.shutdown_event_loop()
else:
await device.turn_on()

else:
logger.info("Inching device activated by switching ON for "
"%ss" % inching)

SonoffSwitch(
host=host,
Expand Down
257 changes: 241 additions & 16 deletions pysonofflan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,98 @@
import random
import time
from typing import Dict, Union, Callable, Awaitable
import asyncio
import enum

import websockets
from websockets.framing import OP_CLOSE, parse_close, OP_PING, OP_PONG

logger = logging.getLogger(__name__)

V6_DEFAULT_TIMEOUT = 10
V6_DEFAULT_PING_INTERVAL = 300

class InvalidState(Exception):
"""
Exception raised when an operation is forbidden in the current state.
"""

CLOSE_CODES = {
1000: "OK",
1001: "going away",
1002: "protocol error",
1003: "unsupported type",
# 1004 is reserved
1005: "no status code [internal]",
1006: "connection closed abnormally [internal]",
1007: "invalid data",
1008: "policy violation",
1009: "message too big",
1010: "extension required",
1011: "unexpected error",
1015: "TLS failure [internal]",
}

# A WebSocket connection goes through the following four states, in order:

class State(enum.IntEnum):
CONNECTING, OPEN, CLOSING, CLOSED = range(4)

class ConnectionClosed(InvalidState):
"""
Exception raised when trying to read or write on a closed connection.
Provides the connection close code and reason in its ``code`` and
``reason`` attributes respectively.
"""

def __init__(self, code, reason):
self.code = code
self.reason = reason
message = "WebSocket connection is closed: "
message += format_close(code, reason)
super().__init__(message)

def format_close(code, reason):
"""
Display a human-readable version of the close code and reason.
"""
if 3000 <= code < 4000:
explanation = "registered"
elif 4000 <= code < 5000:
explanation = "private use"
else:
explanation = CLOSE_CODES.get(code, "unknown")
result = "code = {} ({}), ".format(code, explanation)

if reason:
result += "reason = {}".format(reason)
else:
result += "no reason"

return result

class SonoffLANModeClientProtocol(websockets.WebSocketClientProtocol):
"""Customised WebSocket client protocol to ignore pong payload match."""

async def read_data_frame(self, max_size):
@asyncio.coroutine
def read_data_frame(self, max_size):
"""
Copied from websockets.WebSocketCommonProtocol to change pong handling
"""
logger = logging.getLogger(__name__)

while True:
frame = await self.read_frame(max_size)
frame = yield from self.read_frame(max_size)

if frame.opcode == OP_CLOSE:
self.close_code, self.close_reason = parse_close(frame.data)
await self.write_close_frame(frame.data)
yield from self.write_close_frame(frame.data)
return

elif frame.opcode == OP_PING:
ping_hex = binascii.hexlify(frame.data).decode() or '[empty]'
logger.debug(
"%s - received ping, sending pong: %s", self.side, ping_hex
)
await self.pong(frame.data)
yield from self.pong(frame.data)

elif frame.opcode == OP_PONG:
# Acknowledge pings on solicited pongs, regardless of payload
Expand All @@ -52,6 +116,166 @@ async def read_data_frame(self, max_size):
else:
return frame

def __init__(self, **kwds):

logger.debug("__init__()" )

if float(websockets.__version__) < 7.0:

self.ping_interval = V6_DEFAULT_PING_INTERVAL
self.ping_timeout = V6_DEFAULT_TIMEOUT

#self.close_code: int
#self.close_reason: str

# Task sending keepalive pings.
self.keepalive_ping_task = None

super().__init__(**kwds)

def connection_open(self):

logger.debug("connection_open()")

super().connection_open()

if float(websockets.__version__) < 7.0:

# Start the task that sends pings at regular intervals.
self.keepalive_ping_task = asyncio.ensure_future(
self.keepalive_ping(), loop=self.loop
)

@asyncio.coroutine
def keepalive_ping(self):

logger.debug("keepalive_ping()" )

if float(websockets.__version__) >= 7.0:

super().keepalive_ping()

else:

"""
Send a Ping frame and wait for a Pong frame at regular intervals.
This coroutine exits when the connection terminates and one of the
following happens:
- :meth:`ping` raises :exc:`ConnectionClosed`, or
- :meth:`close_connection` cancels :attr:`keepalive_ping_task`.
"""
if self.ping_interval is None:
return

try:
while True:

yield from asyncio.sleep(self.ping_interval, loop=self.loop)

# ping() cannot raise ConnectionClosed, only CancelledError:
# - If the connection is CLOSING, keepalive_ping_task will be
# canceled by close_connection() before ping() returns.
# - If the connection is CLOSED, keepalive_ping_task must be
# canceled already.

ping_waiter = yield from self.ping()

if self.ping_timeout is not None:
try:
yield from asyncio.wait_for(
ping_waiter, self.ping_timeout, loop=self.loop
)

except asyncio.TimeoutError:
logger.debug("%s ! timed out waiting for pong", self.side)
self.fail_connection(1011)
break

except asyncio.CancelledError:
raise

except Exception:
logger.warning("Unexpected exception in keepalive ping task", exc_info=True)

@asyncio.coroutine
def close_connection(self):

logger.debug("close_connection()")

yield from super().close_connection()

logger.debug("super.close_connection() finished" )

if float(websockets.__version__) < 7.0:

# Cancel the keepalive ping task.
if self.keepalive_ping_task is not None:
self.keepalive_ping_task.cancel()



def abort_keepalive_pings(self):

logger.debug("abort_keepalive_pings()")

if float(websockets.__version__) >= 7.0:
super().abort_keepalive_pings()

else:

"""
Raise ConnectionClosed in pending keepalive pings.
They'll never receive a pong once the connection is closed.
"""
assert self.state is State.CLOSED
exc = ConnectionClosed(self.close_code, self.close_reason)
exc.__cause__ = self.transfer_data_exc # emulate raise ... from ...

try:

for ping in self.pings.values():
ping.set_exception(exc)

except asyncio.InvalidStateError:
pass

""" No Need to do this as in V6, this is done in super.close_connection()
if self.pings:
pings_hex = ', '.join(
binascii.hexlify(ping_id).decode() or '[empty]'
for ping_id in self.pings
)
plural = 's' if len(self.pings) > 1 else ''
logger.debug(
"%s - aborted pending ping%s: %s", self.side, plural, pings_hex
)"""

def connection_lost(self, exc):

logger.debug("connection_lost()" )

if float(websockets.__version__) < 7.0:

logger.debug("%s - event = connection_lost(%s)", self.side, exc)
self.state = State.CLOSED
logger.debug("%s - state = CLOSED", self.side)
if self.close_code is None:
self.close_code = 1006
if self.close_reason is None:
self.close_reason = ""
logger.debug(
"%s x code = %d, reason = %s",
self.side,
self.close_code,
self.close_reason or "[no reason]",
)

self.abort_keepalive_pings()

super().connection_lost(exc)



class SonoffLANModeClient:
"""
Expand Down Expand Up @@ -81,9 +305,10 @@ def __init__(self, host: str,
self.timeout = timeout
self.logger = logger
self.websocket = None
self.keep_running = True
self.event_handler = event_handler
self.connected = False
self.connected_event = asyncio.Event()
self.disconnected_event = asyncio.Event()


if self.logger is None:
self.logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -112,29 +337,29 @@ async def connect(self):
subprotocols=['chat'],
klass=SonoffLANModeClientProtocol
)
self.connected = True
except websockets.InvalidMessage as ex:
self.logger.error('SonoffLANModeClient connection failed: %s' % ex)
raise ex

async def close_connection(self):
self.logger.debug('Closing websocket from client close_connection')
self.connected = False
self.connected_event.clear()
self.disconnected_event.set()
if self.websocket is not None:
self.logger.debug('calling websocket.close')
await self.websocket.close()

self.websocket = None # Ensure we cannot close multiple times
self.logger.debug('websocket was closed')

async def receive_message_loop(self):
try:
while self.keep_running:
while True:
self.logger.debug('Waiting for messages on websocket')
message = await self.websocket.recv()
await self.event_handler(message)
self.logger.debug('Message passed to handler, should loop now')
finally:
self.logger.debug('receive_message_loop finally block reached: '
'closing websocket')
if self.websocket is not None:
await self.websocket.close()
self.logger.debug('receive_message_loop finally block reached')

async def send_online_message(self):
self.logger.debug('Sending user online message over websocket')
Expand Down
Loading

0 comments on commit 9aafc07

Please sign in to comment.