Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiline support #56

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion nad_receiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import codecs
import socket
from time import sleep
from typing import Any, Dict, Iterable, Optional, Union
from typing import Any, Dict, Iterable, Optional, Union, List
from nad_receiver.nad_commands import CMDS
from nad_receiver.nad_transport import (NadTransport, SerialPortTransport, TelnetTransportWrapper,
DEFAULT_TIMEOUT)
Expand Down Expand Up @@ -55,6 +55,42 @@ def exec_command(self, domain: str, function: str, operator: str, value: Optiona
pass
return None

def exec_commands(self, commands: List)-> Optional[List[str]]:
"""
Write a series of commands to the receiver and read the values
it returns.

The receiver will always return a value, also when setting a value.
"""
cmds = []
for command in commands:
domain = command[0]
function = command[1]
operator = command[2]
value = command[3] if len(command) > 3 else None
if operator in CMDS[domain][function]['supported_operators']:
if operator == '=' and value is None:
raise ValueError('No value provided')

cmd = ''.join([CMDS[domain][function]['cmd'], operator]) # type: ignore
assert isinstance(cmd, str)
if value:
cmd = cmd + value
else:
raise ValueError('Invalid operator provided %s' % operator)

cmds.append(cmd)

try:
msgs = self.transport.communicate_multiline(cmds)
_LOGGER.debug(f"sent: '{cmds}' reply: '{msgs}'")
if msgs is None:
return None
return [msg.split('=')[1] for msg in msgs]
except IndexError:
pass
return None

def main_dimmer(self, operator: str, value: Optional[str] =None) -> Optional[str]:
"""Execute Main.Dimmer."""
return self.exec_command('main', 'dimmer', operator, value)
Expand Down Expand Up @@ -178,6 +214,39 @@ def __init__(self, host: str, port: int =23, timeout: int =DEFAULT_TIMEOUT):
"""Create NADTelnet."""
self.transport = TelnetTransportWrapper(host, port, timeout)

def status(self) -> Optional[Dict[str, Any]]:
"""
Return the status of the device.

Returns a dictionary with keys 'volume' (int 0-200) , 'power' (bool),
'muted' (bool) and 'source' (str).
"""
nad_reply = self.exec_commands([
[ "main", "power", "?"],
[ "main", "mute", "?"],
[ "main", "volume", "?"],
])
if nad_reply is None:
return None

return {'power': nad_reply[0],
'muted': nad_reply[1],
'volume': nad_reply[2]}

def status_all(self) -> Optional[Dict[str, Any]]:
"""
Return all status values of the device.

Returns a dictionary with keys like 'main_volume' for
each available status value.
"""
nad_reply = self.transport.communicate_multiline("?")
_LOGGER.debug(f"sent: '?' reply: '{nad_reply}'")
if nad_reply is None:
return None

values = [x.split("=") for x in nad_reply]
return {x[0].lower().replace(".", "_"): x[1] for x in values}

class NADReceiverTCP:
"""
Expand Down
43 changes: 42 additions & 1 deletion nad_receiver/nad_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import telnetlib
import threading

from typing import Optional
from typing import Optional, List

import logging

Expand All @@ -19,6 +19,9 @@ class NadTransport(abc.ABC):
def communicate(self, command: str) -> str:
pass

def communicate_multiline(self, command: str) -> str:
pass


class SerialPortTransport(NadTransport):
"""Transport for NAD protocol over RS-232."""
Expand Down Expand Up @@ -118,6 +121,22 @@ def communicate(self, cmd: str) -> str:

return rsp

def communicate_multiline(self, cmds: List[str]) -> List[str]:
rsp = ""
if not self._open_connection():
return rsp

try:
rsp = self.nad_telnet.communicate_multiline(cmds)
except EOFError as cc:
# Connection closed
_LOGGER.debug("Connection closed: %s", cc)
self.nad_telnet.close_connection()
except UnicodeError as ue:
# Some unicode error, but connection is open
_LOGGER.debug("Unicode error: %s", ue)

return rsp

class TelnetTransport(NadTransport):
"""
Expand Down Expand Up @@ -175,3 +194,25 @@ def communicate(self, cmd: str) -> str:
rsp = self.telnet.read_until(b"\r", self.timeout)
_LOGGER.debug("Read response: '%s'", str(rsp))
return rsp.strip().decode()

def communicate_multiline(self, cmds: List[str]) -> List[str]:
if not self.telnet:
raise Exception("Connection is closed")

_LOGGER.debug("Sending commands: '%s'", cmds)
cmd = b"".join([f"\n{cmd}\r".encode() for cmd in cmds])
self.telnet.write(cmd)

rsp_lines = []
while True:
# Notice NAD response to command ends with \r and starts with \n
# E.g. b'\nMain.Power=On\r'
rsp = self.telnet.read_until(b"\r", self.timeout)
_LOGGER.debug("Read response: '%s'", str(rsp))
rsp = rsp.strip().decode()
if len(rsp) > 1:
rsp_lines.append(rsp)
else:
break

return rsp_lines