diff --git a/adafruit_wiznet5k/adafruit_wiznet5k.py b/adafruit_wiznet5k/adafruit_wiznet5k.py index 4d380fc..bef3a38 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k.py @@ -7,7 +7,6 @@ # SPDX-FileCopyrightText: 2021 Adam Cummick # # SPDX-License-Identifier: MIT - """ `adafruit_wiznet5k` ================================================================================ @@ -27,6 +26,23 @@ * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice """ + +# pylint: disable=too-many-lines +from __future__ import annotations + +try: + from typing import TYPE_CHECKING, Optional, Union, List, Tuple, Sequence + + if TYPE_CHECKING: + from circuitpython_typing import WriteableBuffer + import busio + import digitalio +except ImportError: + pass + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_Wiznet5k.git" + from random import randint import time from micropython import const @@ -35,11 +51,6 @@ import adafruit_wiznet5k.adafruit_wiznet5k_dhcp as dhcp import adafruit_wiznet5k.adafruit_wiznet5k_dns as dns - -__version__ = "0.0.0+auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_Wiznet5k.git" - - # Wiznet5k Registers REG_MR = const(0x0000) # Mode REG_GAR = const(0x0001) # Gateway IP Address @@ -122,23 +133,12 @@ W5100_MAX_SOCK_NUM = const(0x04) SOCKET_INVALID = const(255) - # Source ports in use SRC_PORTS = [0] * W5200_W5500_MAX_SOCK_NUM class WIZNET5K: # pylint: disable=too-many-public-methods, too-many-instance-attributes - """Interface for WIZNET5K module. - - :param ~busio.SPI spi_bus: The SPI bus the Wiznet module is connected to. - :param ~digitalio.DigitalInOut cs: Chip select pin. - :param ~digitalio.DigitalInOut rst: Optional reset pin. - :param bool is_dhcp: Whether to start DHCP automatically or not. - :param list mac: The Wiznet's MAC Address. - :param str hostname: The desired hostname, with optional {} to fill in MAC. - :param int dhcp_timeout: Timeout in seconds for DHCP response. - :param bool debug: Enable debugging output. - """ + """Interface for WIZNET5K module.""" TCP_MODE = const(0x21) UDP_MODE = const(0x02) @@ -147,15 +147,27 @@ class WIZNET5K: # pylint: disable=too-many-public-methods, too-many-instance-at # pylint: disable=too-many-arguments def __init__( self, - spi_bus, - cs, # pylint: disable=invalid-name - reset=None, - is_dhcp=True, - mac=DEFAULT_MAC, - hostname=None, - dhcp_timeout=30, - debug=False, - ): + spi_bus: busio.SPI, + cs: digitalio.DigitalInOut, # pylint: disable=invalid-name + reset: Optional[digitalio.DigitalInOut] = None, + is_dhcp: bool = True, + mac: Union[List[int], Tuple[int]] = DEFAULT_MAC, + hostname: Optional[str] = None, + dhcp_timeout: float = 30.0, + debug: bool = False, + ) -> None: + """ + :param busio.SPI spi_bus: The SPI bus the Wiznet module is connected to. + :param digitalio.DigitalInOut cs: Chip select pin. + :param digitalio.DigitalInOut reset: Optional reset pin, defaults to None. + :param bool is_dhcp: Whether to start DHCP automatically or not, defaults to True. + :param Union[List[int], Tuple[int]] mac: The Wiznet's MAC Address, defaults to + (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED). + :param str hostname: The desired hostname, with optional {} to fill in the MAC + address, defaults to None. + :param float dhcp_timeout: Timeout in seconds for DHCP response, defaults to 30.0. + :param bool debug: Enable debugging output, defaults to False. + """ self._debug = debug self._chip_type = None self._device = SPIDevice(spi_bus, cs, baudrate=8000000, polarity=0, phase=0) @@ -204,13 +216,19 @@ def __init__( self._dhcp_client = None assert ret == 0, "Failed to configure DHCP Server!" - def set_dhcp(self, hostname=None, response_timeout=30): - """Initializes the DHCP client and attempts to retrieve - and set network configuration from the DHCP server. - Returns 0 if DHCP configured, -1 otherwise. + def set_dhcp( + self, hostname: Optional[str] = None, response_timeout: float = 30 + ) -> int: + """ + Initialize the DHCP client and attempt to retrieve and set network + configuration from the DHCP server. + + :param Optional[str] hostname: The desired hostname for the DHCP server with optional {} to + fill in the MAC address, defaults to None. + :param float response_timeout: Time to wait for server to return packet in seconds, + defaults to 30.0. - :param str hostname: The desired hostname, with optional {} to fill in MAC. - :param int response_timeout: Time to wait for server to return packet, in seconds. + :return int: 0 if DHCP configured, -1 otherwise. """ if self._debug: print("* Initializing DHCP") @@ -232,14 +250,18 @@ def set_dhcp(self, hostname=None, response_timeout=30): return 0 return -1 - def maintain_dhcp_lease(self): - """Maintain DHCP lease""" + def maintain_dhcp_lease(self) -> None: + """Maintain the DHCP lease.""" if self._dhcp_client is not None: self._dhcp_client.maintain_dhcp_lease() - def get_host_by_name(self, hostname): - """Convert a hostname to a packed 4-byte IP Address. - Returns a 4 bytearray. + def get_host_by_name(self, hostname: str) -> bytes: + """ + Convert a hostname to a packed 4-byte IP Address. + + :param str hostname: The host name to be converted. + + :return Union[int, bytes]: a 4 bytearray. """ if self._debug: print("* Get host by name") @@ -254,8 +276,12 @@ def get_host_by_name(self, hostname): return ret @property - def max_sockets(self): - """Returns max number of sockets supported by chip.""" + def max_sockets(self) -> int: + """ + Maximum number of sockets supported by chip. + + :return int: Maximum supported sockets. + """ if self._chip_type == "w5500": return W5200_W5500_MAX_SOCK_NUM if self._chip_type == "w5100s": @@ -263,44 +289,80 @@ def max_sockets(self): return -1 @property - def chip(self): - """Returns the chip type.""" + def chip(self) -> str: + """ + Ethernet controller chip type. + + :return str: The chip type. + """ return self._chip_type @property - def ip_address(self): - """Returns the configured IP address.""" + def ip_address(self) -> bytearray: + """ + Configured IP address. + + :return bytearray: IP address as four bytes. + """ return self.read(REG_SIPR, 0x00, 4) - def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name - """Converts a bytearray IP address to a - dotted-quad string for printing + def pretty_ip( + self, + # pylint: disable=no-self-use, invalid-name + ip: bytearray, + ) -> str: + """ + Convert a 4 byte IP address to a dotted-quad string for printing. + + :param bytearray ip: A four byte IP address. + :return str: The IP address (a string of the form '255.255.255.255'). """ return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3]) - def unpretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name - """Converts a dotted-quad string to a bytearray IP address""" + def unpretty_ip( + self, + # pylint: disable=no-self-use, invalid-name + ip: str, + ) -> bytes: + """ + Convert a dotted-quad string to a four byte IP address. + + :param str ip: IP address (a string of the form '255.255.255.255') to be converted. + + :return bytes: IP address in four bytes. + """ octets = [int(x) for x in ip.split(".")] return bytes(octets) @property - def mac_address(self): - """Returns the hardware's MAC address.""" + def mac_address(self) -> bytearray: + """ + Ethernet hardware's MAC address. + + :return bytearray: Six byte MAC address.""" return self.read(REG_SHAR, 0x00, 6) @mac_address.setter - def mac_address(self, address): - """Sets the hardware MAC address. + def mac_address(self, address: Sequence[Union[int, bytes]]) -> None: + """ + Sets the hardware MAC address. :param tuple address: Hardware MAC address. """ self.write(REG_SHAR, 0x04, address) - def pretty_mac(self, mac): # pylint: disable=no-self-use, invalid-name - """Converts a bytearray MAC address to a - dotted-quad string for printing + def pretty_mac( + self, + # pylint: disable=no-self-use, invalid-name + mac: bytearray, + ) -> str: + """ + Convert a bytearray MAC address to a ':' seperated string for display. + + :param bytearray mac: The MAC address. + :return str: Mac Address in the form 00:00:00:00:00:00 """ return "%s:%s:%s:%s:%s:%s" % ( hex(mac[0]), @@ -311,10 +373,13 @@ def pretty_mac(self, mac): # pylint: disable=no-self-use, invalid-name hex(mac[5]), ) - def remote_ip(self, socket_num): - """Returns the IP address of the host who sent the current incoming packet. + def remote_ip(self, socket_num: int) -> Union[str, bytearray]: + """ + IP address of the host which sent the current incoming packet. + + :param int socket_num: ID number of the socket to check. - :param int socket num: Desired socket. + :return Union[str, bytearray]: A four byte IP address. """ if socket_num >= self.max_sockets: return self._pbuff @@ -323,8 +388,11 @@ def remote_ip(self, socket_num): return self.pretty_ip(self._pbuff) @property - def link_status(self): - """ "Returns if the PHY is connected.""" + def link_status(self) -> int: + """Physical hardware (PHY) connection status. + + :return int: 1 if the link is up, 0 if the link is down. + """ if self._chip_type == "w5500": data = self.read(REG_PHYCFGR, 0x00) return data[0] & 0x01 @@ -333,17 +401,29 @@ def link_status(self): return data[0] & 0x01 return 0 - def remote_port(self, socket_num): - """Returns the port of the host who sent the current incoming packet.""" + def remote_port(self, socket_num: int) -> Union[int, bytearray]: + """ + Port of the host which sent the current incoming packet. + + :param int socket_num: ID number of the socket to check. + + :return Union[int, bytearray]: The port number of the socket connection. + """ if socket_num >= self.max_sockets: return self._pbuff - for octet in range(0, 2): + for octet in range(2): self._pbuff[octet] = self._read_socket(socket_num, REG_SNDPORT + octet)[0] return int((self._pbuff[0] << 8) | self._pbuff[0]) @property - def ifconfig(self): - """Returns the network configuration as a tuple.""" + def ifconfig( + self, + ) -> Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: + """ + Network configuration information. + + :return Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: \ + The IP address, subnet mask, gateway address and DNS server address.""" return ( self.ip_address, self.read(REG_SUBR, 0x00, 4), @@ -352,10 +432,14 @@ def ifconfig(self): ) @ifconfig.setter - def ifconfig(self, params): - """Sets network configuration to provided tuple in format: - (ip_address, subnet_mask, gateway_address, dns_server). + def ifconfig( + self, params: Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]] + ) -> None: + """ + Set network configuration. + :param Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: + Configuration settings - (ip_address, subnet_mask, gateway_address, dns_server). """ ip_address, subnet_mask, gateway_address, dns_server = params @@ -365,8 +449,12 @@ def ifconfig(self, params): self._dns = dns_server - def _w5100_init(self): - """Initializes and detects a wiznet5k module.""" + def _w5100_init(self) -> int: + """ + Detect and initialize a Wiznet5k ethernet module. + + :return int: 1 if the initialization succeeds, 0 if it fails. + """ time.sleep(1) self._cs.switch_to_output() self._cs.value = 1 @@ -386,8 +474,12 @@ def _w5100_init(self): return 0 return 1 - def detect_w5500(self): - """Detects W5500 chip.""" + def detect_w5500(self) -> int: + """ + Detect W5500 chip. + + :return int: 1 if a W5500 chip is detected, -1 if not. + """ self._chip_type = "w5500" assert self.sw_reset() == 0, "Chip not reset properly!" self._write_mr(0x08) @@ -411,8 +503,12 @@ def detect_w5500(self): # self._ch_base_msb = 0x10 return 1 - def detect_w5100s(self): - """Detects W5100S chip.""" + def detect_w5100s(self) -> int: + """ + Detect W5100S chip. + + :return int: 1 if a W5100 chip is detected, -1 if not. + """ self._chip_type = "w5100s" # sw reset assert self.sw_reset() == 0, "Chip not reset properly!" @@ -422,10 +518,12 @@ def detect_w5100s(self): self._ch_base_msb = 0x0400 return 1 - def sw_reset(self): - """Performs a soft-reset on a Wiznet chip - by writing to its MR register reset bit. + def sw_reset(self) -> int: + """Perform a soft-reset on the Wiznet chip. + + Perform a soft reset by writing to the chip's MR register reset bit. + :return int: 0 if the reset succeeds, -1 if not. """ mode_reg = self._read_mr() self._write_mr(0x80) @@ -436,22 +534,31 @@ def sw_reset(self): return -1 return 0 - def _read_mr(self): - """Reads from the Mode Register (MR).""" + def _read_mr(self) -> bytearray: + """Read from the Mode Register (MR).""" res = self.read(REG_MR, 0x00) return res - def _write_mr(self, data): - """Writes to the mode register (MR). + def _write_mr(self, data: int) -> None: + """Write to the mode register (MR).""" + self.write(REG_MR, 0x04, data) - :param int data: Data to write to the mode register. + def read( + self, + addr: int, + callback: int, + length: int = 1, + buffer: Optional[WriteableBuffer] = None, + ) -> Union[WriteableBuffer, bytearray]: """ - self.write(REG_MR, 0x04, data) + Read data from a register address. - def read(self, addr, callback, length=1, buffer=None): - """Reads data from a register address. + :param int addr: Register address to read. + :param int callback: Callback reference. + :param int length: Number of bytes to read from the register, defaults to 1. + :param Optional[WriteableBuffer] buffer: Buffer to read data into, defaults to None. - :param int addr: Register address. + :return Union[WriteableBuffer, bytearray]: Data read from the chip. """ with self._device as bus_device: if self._chip_type == "w5500": @@ -471,13 +578,15 @@ def read(self, addr, callback, length=1, buffer=None): bus_device.readinto(buffer, end=length) # pylint: disable=no-member return buffer - def write(self, addr, callback, data): - """Write data to a register address. + def write( + self, addr: int, callback: int, data: Union[int, Sequence[Union[int, bytes]]] + ) -> None: + """ + Write data to a register address. :param int addr: Destination address. :param int callback: Callback reference. - :param int data: Data to write, as an integer. - :param bytearray data: Data to write, as a bytearray. + :param Union[int, Sequence[Union[int, bytes]]] data: Data to write to the register address. """ with self._device as bus_device: if self._chip_type == "w5500": @@ -498,11 +607,15 @@ def write(self, addr, callback, data): # Socket-Register API - def socket_available(self, socket_num, sock_type=SNMR_TCP): - """Returns the amount of bytes to be read from the socket. + def socket_available(self, socket_num: int, sock_type: int = SNMR_TCP) -> int: + """ + Number of bytes available to be read from the socket. + + :param int socket_num: Socket to check for available bytes. + :param int sock_type: Socket type. Use SNMR_TCP for TCP or SNMR_UDP for UDP, \ + defaults to SNMR_TCP. - :param int socket_num: Desired socket to return bytes from. - :param int sock_type: Socket type, defaults to TCP. + :return int: Number of bytes available to read. """ if self._debug: print( @@ -530,19 +643,38 @@ def socket_available(self, socket_num, sock_type=SNMR_TCP): return ret return 0 - def socket_status(self, socket_num): - """Returns the socket connection status. Can be: SNSR_SOCK_CLOSED, - SNSR_SOCK_INIT, SNSR_SOCK_LISTEN, SNSR_SOCK_SYNSENT, SNSR_SOCK_SYNRECV, - SNSR_SYN_SOCK_ESTABLISHED, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSING, - SNSR_SOCK_TIME_WAIT, SNSR_SOCK_CLOSE_WAIT, SNSR_LAST_ACK, + def socket_status(self, socket_num: int) -> Optional[bytearray]: + """ + Socket connection status. + + Can be: SNSR_SOCK_CLOSED, SNSR_SOCK_INIT, SNSR_SOCK_LISTEN, SNSR_SOCK_SYNSENT, + SNSR_SOCK_SYNRECV, SNSR_SYN_SOCK_ESTABLISHED, SNSR_SOCK_FIN_WAIT, + SNSR_SOCK_CLOSING, SNSR_SOCK_TIME_WAIT, SNSR_SOCK_CLOSE_WAIT, SNSR_LAST_ACK, SNSR_SOCK_UDP, SNSR_SOCK_IPRAW, SNSR_SOCK_MACRAW, SNSR_SOCK_PPOE. + + :param int socket_num: ID of socket to check. + + :return: Optional[bytearray] """ return self._read_snsr(socket_num) - def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP): - """Open and verify we've connected a socket to a dest IP address - or hostname. By default, we use 'conn_mode'= SNMR_TCP but we - may also use SNMR_UDP. + def socket_connect( + self, + socket_num: int, + dest: Union[bytes, bytearray], + port: int, + conn_mode: int = SNMR_TCP, + ) -> int: + """ + Open and verify a connection from a socket to a destination IP address + or hostname. A TCP connection is made by default. A UDP connection can also + be made. + + :param int socket_num: ID of the socket to be connected. + :param Union[bytes, bytearray] dest: The destination as a host name or IP address. + :param int port: Port to connect to (0 - 65,536). + :param int conn_mode: The connection mode. Use SNMR_TCP for TCP or SNMR_UDP for UDP, + defaults to SNMR_TCP. """ assert self.link_status, "Ethernet cable disconnected!" if self._debug: @@ -554,7 +686,7 @@ def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP): # initialize a socket and set the mode res = self.socket_open(socket_num, conn_mode=conn_mode) if res == 1: - raise RuntimeError("Failed to initalize a connection with the socket.") + raise RuntimeError("Failed to initialize a connection with the socket.") # set socket destination IP and port self._write_sndipr(socket_num, dest) @@ -573,15 +705,19 @@ def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP): self.udp_datasize[socket_num] = 0 return 1 - def _send_socket_cmd(self, socket, cmd): + def _send_socket_cmd(self, socket: int, cmd: int) -> None: + """Send a socket command to a socket.""" self._write_sncr(socket, cmd) while self._read_sncr(socket) != b"\x00": if self._debug: print("waiting for sncr to clear...") - def get_socket(self): - """Requests, allocates and returns a socket from the W5k - chip. Returned socket number may not exceed max_sockets. + def get_socket(self) -> int: + """Request, allocate and return a socket from the W5k chip. + + Cycle through the sockets to find the first available one, if any. + + :return int: The first available socket. Returns 0xFF if no sockets are free. """ if self._debug: print("*** Get socket") @@ -597,12 +733,16 @@ def get_socket(self): print("Allocated socket #{}".format(sock)) return sock - def socket_listen(self, socket_num, port, conn_mode=SNMR_TCP): - """Start listening on a socket (default TCP mode). + def socket_listen( + self, socket_num: int, port: int, conn_mode: int = SNMR_TCP + ) -> None: + """ + Listen on a socket's port. - :param int socket_num: socket number - :param int port: port to listen on - :param int conn_mode: connection mode SNMR_TCP (default) or SNMR_UDP + :param int socket_num: ID of socket to listen on. + :param int port: Port to listen on (0 - 65,535). + :param int conn_mode: Connection mode SNMR_TCP for TCP or SNMR_UDP for + UDP, defaults to SNMR_TCP. """ assert self.link_status, "Ethernet cable disconnected!" if self._debug: @@ -616,7 +756,7 @@ def socket_listen(self, socket_num, port, conn_mode=SNMR_TCP): res = self.socket_open(socket_num, conn_mode=conn_mode) self.src_port = 0 if res == 1: - raise RuntimeError("Failed to initalize the socket.") + raise RuntimeError("Failed to initialize the socket.") # Send listen command self._send_socket_cmd(socket_num, CMD_SOCK_LISTEN) # Wait until ready @@ -626,11 +766,21 @@ def socket_listen(self, socket_num, port, conn_mode=SNMR_TCP): if status[0] == SNSR_SOCK_CLOSED: raise RuntimeError("Listening socket closed.") - def socket_accept(self, socket_num): - """Gets the dest IP and port from an incoming connection. - Returns the next socket number so listening can continue + def socket_accept( + self, socket_num: int + ) -> Tuple[int, Tuple[Union[str, bytearray], Union[int, bytearray]]]: + """ + Destination IP address and port from an incoming connection. + + Return the next socket number so listening can continue, along with + the IP address and port of the incoming connection. - :param int socket_num: socket number + :param int socket_num: Socket number with connection to check. + :return Tuple[int, Tuple[Union[str, bytearray], Union[int, bytearray]]]: + If successful, the next (socket number, (destination IP address, destination port)). + + If errors occur, the destination IP address and / or the destination port may be + returned as bytearrays. """ dest_ip = self.remote_ip(socket_num) dest_port = self.remote_port(socket_num) @@ -643,9 +793,16 @@ def socket_accept(self, socket_num): ) return next_socknum, (dest_ip, dest_port) - def socket_open(self, socket_num, conn_mode=SNMR_TCP): - """Opens a TCP or UDP socket. By default, we use - 'conn_mode'=SNMR_TCP but we may also use SNMR_UDP. + def socket_open(self, socket_num: int, conn_mode: int = SNMR_TCP) -> int: + """ + Open an IP socket. + + The socket may connect via TCP or UDP protocols. + + :param int socket_num: The socket number to open. + :param int conn_mode: The protocol to use. Use SNMR_TCP for TCP or SNMR_UDP for \ + UDP, defaults to SNMR_TCP. + :return int: 1 if the socket was opened, 0 if not. """ assert self.link_status, "Ethernet cable disconnected!" if self._debug: @@ -686,23 +843,41 @@ def socket_open(self, socket_num, conn_mode=SNMR_TCP): return 0 return 1 - def socket_close(self, socket_num): - """Closes a socket.""" + def socket_close(self, socket_num: int) -> None: + """ + Close a socket. + + :param int socket_num: The socket to close. + """ if self._debug: print("*** Closing socket #%d" % socket_num) self._write_sncr(socket_num, CMD_SOCK_CLOSE) self._read_sncr(socket_num) - def socket_disconnect(self, socket_num): - """Disconnect a TCP connection.""" + def socket_disconnect(self, socket_num: int) -> None: + """ + Disconnect a TCP or UDP connection. + + :param int socket_num: The socket to close. + """ if self._debug: print("*** Disconnecting socket #%d" % socket_num) self._write_sncr(socket_num, CMD_SOCK_DISCON) self._read_sncr(socket_num) - def socket_read(self, socket_num, length): - """Reads data from a socket into a buffer. - Returns buffer. + def socket_read( + self, socket_num: int, length: int + ) -> Tuple[int, Union[int, bytearray]]: + """ + Read data from a TCP socket. + + :param int socket_num: The socket to read data from. + :param int length: The number of bytes to read from the socket. + + :return Tuple[int, Union[int, bytearray]]: If the read was successful then the first + item of the tuple is the length of the data and the second is the data. If the read + was unsuccessful then both items equal an error code, 0 for no data waiting and -1 + for no connection to the socket. """ assert self.link_status, "Ethernet cable disconnected!" assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets." @@ -761,8 +936,19 @@ def socket_read(self, socket_num, length): self._read_sncr(socket_num) return ret, resp - def read_udp(self, socket_num, length): - """Read UDP socket's current message bytes.""" + def read_udp( + self, socket_num: int, length: int + ) -> Union[int, Tuple[int, Union[int, bytearray]]]: + """ + Read UDP socket's current message bytes. + + :param int socket_num: The socket to read data from. + :param int length: The number of bytes to read from the socket. + + :return Union[int, Tuple[int, Union[int, bytearray]]]: If the read was successful then + the first item of the tuple is the length of the data and the second is the data. + If the read was unsuccessful then -1 is returned. + """ if self.udp_datasize[socket_num] > 0: if self.udp_datasize[socket_num] <= length: ret, resp = self.socket_read(socket_num, self.udp_datasize[socket_num]) @@ -774,8 +960,19 @@ def read_udp(self, socket_num, length): return ret, resp return -1 - def socket_write(self, socket_num, buffer, timeout=0): - """Writes a bytearray to a provided socket.""" + def socket_write( + self, socket_num: int, buffer: bytearray, timeout: float = 0 + ) -> int: + """ + Write data to a socket. + + :param int socket_num: The socket to write to. + :param bytearray buffer: The data to write to the socket. + :param float timeout: Write data timeout in seconds, defaults to 0.0 which waits + indefinitely. + + :return int: The number of bytes written to the buffer. + """ assert self.link_status, "Ethernet cable disconnected!" assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets." status = 0 @@ -850,9 +1047,8 @@ def socket_write(self, socket_num, buffer, timeout=0): return ret # Socket-Register Methods - - def _get_rx_rcv_size(self, sock): - """Get size of recieved and saved in socket buffer.""" + def _get_rx_rcv_size(self, sock: int) -> int: + """Size of received and saved in socket buffer.""" val = 0 val_1 = self._read_snrx_rsr(sock) while val != val_1: @@ -861,8 +1057,8 @@ def _get_rx_rcv_size(self, sock): val = self._read_snrx_rsr(sock) return int.from_bytes(val, "big") - def _get_tx_free_size(self, sock): - """Get free size of sock's tx buffer block.""" + def _get_tx_free_size(self, sock: int) -> int: + """Free size of socket's tx buffer block.""" val = 0 val_1 = self._read_sntx_fsr(sock) while val != val_1: @@ -871,71 +1067,71 @@ def _get_tx_free_size(self, sock): val = self._read_sntx_fsr(sock) return int.from_bytes(val, "big") - def _read_snrx_rd(self, sock): + def _read_snrx_rd(self, sock: int) -> int: self._pbuff[0] = self._read_socket(sock, REG_SNRX_RD)[0] self._pbuff[1] = self._read_socket(sock, REG_SNRX_RD + 1)[0] return self._pbuff[0] << 8 | self._pbuff[1] - def _write_snrx_rd(self, sock, data): + def _write_snrx_rd(self, sock: int, data: int) -> None: self._write_socket(sock, REG_SNRX_RD, data >> 8 & 0xFF) self._write_socket(sock, REG_SNRX_RD + 1, data & 0xFF) - def _write_sntx_wr(self, sock, data): + def _write_sntx_wr(self, sock: int, data: int) -> None: self._write_socket(sock, REG_SNTX_WR, data >> 8 & 0xFF) self._write_socket(sock, REG_SNTX_WR + 1, data & 0xFF) - def _read_sntx_wr(self, sock): + def _read_sntx_wr(self, sock: int) -> int: self._pbuff[0] = self._read_socket(sock, 0x0024)[0] self._pbuff[1] = self._read_socket(sock, 0x0024 + 1)[0] return self._pbuff[0] << 8 | self._pbuff[1] - def _read_sntx_fsr(self, sock): + def _read_sntx_fsr(self, sock: int) -> Optional[bytearray]: data = self._read_socket(sock, REG_SNTX_FSR) data += self._read_socket(sock, REG_SNTX_FSR + 1) return data - def _read_snrx_rsr(self, sock): + def _read_snrx_rsr(self, sock: int) -> Optional[bytearray]: data = self._read_socket(sock, REG_SNRX_RSR) data += self._read_socket(sock, REG_SNRX_RSR + 1) return data - def _write_sndipr(self, sock, ip_addr): - """Writes to socket destination IP Address.""" + def _write_sndipr(self, sock: int, ip_addr: bytearray) -> None: + """Write to socket destination IP Address.""" for octet in range(0, 4): self._write_socket(sock, REG_SNDIPR + octet, ip_addr[octet]) - def _write_sndport(self, sock, port): - """Writes to socket destination port.""" + def _write_sndport(self, sock: int, port: int) -> None: + """Write to socket destination port.""" self._write_socket(sock, REG_SNDPORT, port >> 8) self._write_socket(sock, REG_SNDPORT + 1, port & 0xFF) - def _read_snsr(self, sock): - """Reads Socket n Status Register.""" + def _read_snsr(self, sock: int) -> Optional[bytearray]: + """Read Socket n Status Register.""" return self._read_socket(sock, REG_SNSR) - def _write_snmr(self, sock, protocol): + def _write_snmr(self, sock: int, protocol: int) -> None: """Write to Socket n Mode Register.""" self._write_socket(sock, REG_SNMR, protocol) - def _write_snir(self, sock, data): + def _write_snir(self, sock: int, data: int) -> None: """Write to Socket n Interrupt Register.""" self._write_socket(sock, REG_SNIR, data) - def _write_sock_port(self, sock, port): + def _write_sock_port(self, sock: int, port: int) -> None: """Write to the socket port number.""" self._write_socket(sock, REG_SNPORT, port >> 8) self._write_socket(sock, REG_SNPORT + 1, port & 0xFF) - def _write_sncr(self, sock, data): + def _write_sncr(self, sock: int, data: int) -> None: self._write_socket(sock, REG_SNCR, data) - def _read_sncr(self, sock): + def _read_sncr(self, sock: int) -> Optional[bytearray]: return self._read_socket(sock, REG_SNCR) - def _read_snmr(self, sock): + def _read_snmr(self, sock: int) -> Optional[bytearray]: return self._read_socket(sock, REG_SNMR) - def _write_socket(self, sock, address, data): + def _write_socket(self, sock: int, address: int, data: int) -> None: """Write to a W5k socket register.""" if self._chip_type == "w5500": cntl_byte = (sock << 5) + 0x0C @@ -947,7 +1143,7 @@ def _write_socket(self, sock, address, data): ) return None - def _read_socket(self, sock, address): + def _read_socket(self, sock: int, address: int) -> Optional[bytearray]: """Read a W5k socket register.""" if self._chip_type == "w5500": cntl_byte = (sock << 5) + 0x08 diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py index 1a4f9dd..3243f77 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2009 Jordan Terell (blog.jordanterrell.com) +# SPDX-FileCopyrightText: 2009 Jordan Terrell (blog.jordanterrell.com) # SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries # SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck @ Silicognition LLC # @@ -13,6 +13,17 @@ * Author(s): Jordan Terrell, Brent Rubell """ +from __future__ import annotations + +try: + from typing import TYPE_CHECKING, Optional, Union, Tuple, Sequence + + if TYPE_CHECKING: + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +except ImportError: + pass + + import gc import time from random import randint @@ -79,20 +90,25 @@ class DHCP: - """W5k DHCP Client implementation. - - :param eth: Wiznet 5k object - :param list mac_address: Hardware MAC. - :param str hostname: The desired hostname, with optional {} to fill in MAC. - :param int response_timeout: DHCP Response timeout. - :param bool debug: Enable debugging output. - - """ + """W5k DHCP Client implementation.""" # pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name def __init__( - self, eth, mac_address, hostname=None, response_timeout=30, debug=False - ): + self, + eth: WIZNET5K, + mac_address: Sequence[Union[int, bytes]], + hostname: Optional[str] = None, + response_timeout: float = 30.0, + debug: bool = False, + ) -> None: + """ + :param adafruit_wiznet5k.WIZNET5K eth: Wiznet 5k object + :param Sequence[Union[int, bytes]] mac_address: Hardware MAC address. + :param Optional[str] hostname: The desired hostname, with optional {} to fill + in the MAC address, defaults to None. + :param float response_timeout: DHCP Response timeout in seconds, defaults to 30. + :param bool debug: Enable debugging output. + """ self._debug = debug self._response_timeout = response_timeout @@ -137,12 +153,18 @@ def __init__( ) # pylint: disable=too-many-statements - def send_dhcp_message(self, state, time_elapsed, renew=False): - """Assemble and send a DHCP message packet to a socket. + def send_dhcp_message( + self, + state: int, + time_elapsed: float, + renew: bool = False, + ) -> None: + """ + Assemble and send a DHCP message packet to a socket. :param int state: DHCP Message state. :param float time_elapsed: Number of seconds elapsed since DHCP process started - :param bool renew: Set True for renew and rebind + :param bool renew: Set True for renew and rebind, defaults to False """ _BUFF[:] = b"\x00" * len(_BUFF) # OP @@ -238,9 +260,12 @@ def send_dhcp_message(self, state, time_elapsed, renew=False): self._sock.send(_BUFF) # pylint: disable=too-many-branches, too-many-statements - def parse_dhcp_response(self): + def parse_dhcp_response( + self, + ) -> Union[Tuple[int, bytes], Tuple[int, int]]: """Parse DHCP response from DHCP server. - Returns DHCP packet type. + + :return Union[Tuple[int, bytes], Tuple[int, int]]: DHCP packet type. """ # store packet in buffer _BUFF = self._sock.recv() @@ -347,10 +372,12 @@ def parse_dhcp_response(self): return msg_type, xid # pylint: disable=too-many-branches, too-many-statements - def _dhcp_state_machine(self): - """DHCP state machine without wait loops to enable cooperative multi tasking + def _dhcp_state_machine(self) -> None: + """ + DHCP state machine without wait loops to enable cooperative multitasking. This state machine is used both by the initial blocking lease request and - the non-blocking DHCP maintenance function""" + the non-blocking DHCP maintenance function. + """ if self._eth.link_status: if self._dhcp_state == STATE_DHCP_DISCONN: self._dhcp_state = STATE_DHCP_START @@ -485,7 +512,7 @@ def _dhcp_state_machine(self): self._sock.close() self._sock = None - def request_dhcp_lease(self): + def request_dhcp_lease(self) -> bool: """Request to renew or acquire a DHCP lease.""" if self._dhcp_state in (STATE_DHCP_LEASED, STATE_DHCP_WAIT): self._dhcp_state = STATE_DHCP_START @@ -495,6 +522,6 @@ def request_dhcp_lease(self): return self._dhcp_state == STATE_DHCP_LEASED - def maintain_dhcp_lease(self): + def maintain_dhcp_lease(self) -> None: """Maintain DHCP lease""" self._dhcp_state_machine() diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py index c637a5b..efe1a10 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py @@ -13,13 +13,22 @@ * Author(s): MCQN Ltd, Brent Rubell """ +from __future__ import annotations + +try: + from typing import TYPE_CHECKING, Union, Tuple + + if TYPE_CHECKING: + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +except ImportError: + pass + import time from random import getrandbits from micropython import const import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket from adafruit_wiznet5k.adafruit_wiznet5k_socket import htons - QUERY_FLAG = const(0x00) OPCODE_STANDARD_QUERY = const(0x00) RECURSION_DESIRED_FLAG = 1 << 8 @@ -39,12 +48,19 @@ class DNS: - """W5K DNS implementation. - - :param iface: Network interface - """ + """W5K DNS implementation.""" - def __init__(self, iface, dns_address, debug=False): + def __init__( + self, + iface: WIZNET5K, + dns_address: Union[str, Tuple[int, int, int, int]], + debug: bool = False, + ) -> None: + """ + :param adafruit_wiznet5k.WIZNET5K: Ethernet network connection. + :param Union[str, Tuple[int, int, int, int]]: IP address of the DNS server. + :param bool debug: Enable debugging messages, defaults to False. + """ self._debug = debug self._iface = iface socket.set_interface(iface) @@ -52,16 +68,17 @@ def __init__(self, iface, dns_address, debug=False): self._sock.settimeout(1) self._dns_server = dns_address - self._host = 0 + self._host = b"" self._request_id = 0 # request identifier self._pkt_buf = bytearray() - def gethostbyname(self, hostname): - """Translate a host name to IPv4 address format. + def gethostbyname(self, hostname: bytes) -> Union[int, bytes]: + """ + DNS look up of a host name. - :param str hostname: Desired host name to connect to. + :param bytes hostname: Host name to connect to. - Returns the IPv4 address as a bytearray if successful, -1 otherwise. + :return Union[int, bytes] The IPv4 address if successful, -1 otherwise. """ if self._dns_server is None: return INVALID_SERVER @@ -91,10 +108,12 @@ def gethostbyname(self, hostname): def _parse_dns_response( self, - ): # pylint: disable=too-many-return-statements, too-many-branches, too-many-statements, too-many-locals - """Receives and parses DNS query response. - Returns desired hostname address if obtained, -1 otherwise. + ) -> Union[int, bytes]: + # pylint: disable=too-many-return-statements, too-many-branches, too-many-statements, too-many-locals + """ + Receive and parse DNS query response. + :return Union[int, bytes]: Requested hostname IP address if obtained, -1 otherwise. """ # wait for a response start_time = time.monotonic() @@ -117,7 +136,7 @@ def _parse_dns_response( if not xid == self._request_id: if self._debug: print( - "* DNS ERROR: Received request identifer {} \ + "* DNS ERROR: Received request identifier {} \ does not match expected {}".format( xid, self._request_id ) @@ -209,8 +228,8 @@ def _parse_dns_response( # Return address return self._pkt_buf[ptr : ptr + 4] - def _build_dns_header(self): - """Builds DNS header.""" + def _build_dns_header(self) -> None: + """Build a DNS header.""" # generate a random, 16-bit, request identifier self._request_id = getrandbits(16) @@ -235,8 +254,8 @@ def _build_dns_header(self): self._pkt_buf.append(0x00) self._pkt_buf.append(0x00) - def _build_dns_question(self): - """Build DNS question""" + def _build_dns_question(self) -> None: + """Build a DNS query.""" host = self._host.decode("utf-8") host = host.split(".") # write out each section of host diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_ntp.py b/adafruit_wiznet5k/adafruit_wiznet5k_ntp.py index c58b683..17a9971 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_ntp.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_ntp.py @@ -17,24 +17,38 @@ """ +from __future__ import annotations + +try: + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +except ImportError: + pass import time import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket -##__version__ = "0.0.0+auto.0" -##__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_NTP.git" +# __version__ = "0.0.0+auto.0" +# __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_NTP.git" class NTP: - """ - Wiznet5k NTP Client - - :param iface: Wiznet 5k object - :param str ntp_address: The hostname of the NTP server - :param int utc: Numbers of hours to offset time from UTC - :param bool debug: Enable debugging output. - """ + """Wiznet5k NTP Client.""" - def __init__(self, iface, ntp_address, utc, debug=False): + def __init__( + self, + iface: WIZNET5K, + ntp_address: str, + utc: float, + debug: bool = False, + ) -> None: + """ + :param adafruit_wiznet5k.WIZNET5K iface: Wiznet 5k object. + :param str ntp_address: The hostname of the NTP server. + :param float utc: Numbers of hours to offset time from UTC. + :param bool debug: Enable debugging output, defaults to False. + """ self._debug = debug self._iface = iface socket.set_interface(self._iface) @@ -48,11 +62,11 @@ def __init__(self, iface, ntp_address, utc, debug=False): self._pkt_buf_ = bytearray([0x23] + [0x00] * 55) - def get_time(self): + def get_time(self) -> time.struct_time: """ - Get the time from the NTP server + Get the time from the NTP server. - :return: time in seconds since the epoch + :return time.struct_time: The local time. """ self._sock.bind((None, 50001)) self._sock.sendto(self._pkt_buf_, (self._ntp_server, 123)) @@ -61,6 +75,7 @@ def get_time(self): if data: sec = data[40:44] int_cal = int.from_bytes(sec, "big") - cal = int_cal - 2208988800 + self._utc * 3600 + # UTC offset may be a float as some offsets are half hours so force int. + cal = int(int_cal - 2208988800 + self._utc * 3600) cal = time.localtime(cal) return cal diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py index 2f3edcf..01a372a 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py @@ -12,22 +12,44 @@ * Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick """ +from __future__ import annotations + +try: + from typing import TYPE_CHECKING, Optional, Tuple, List, Union + + if TYPE_CHECKING: + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +except ImportError: + pass + import gc import time from micropython import const + import adafruit_wiznet5k as wiznet5k -_the_interface = None # pylint: disable=invalid-name +# pylint: disable=invalid-name +_the_interface: Optional[WIZNET5K] = None + +def set_interface(iface: WIZNET5K) -> None: + """ + Helper to set the global internet interface. -def set_interface(iface): - """Helper to set the global internet interface.""" + :param wiznet5k.adafruit_wiznet5k.WIZNET5K iface: The ethernet interface. + """ global _the_interface # pylint: disable=global-statement, invalid-name _the_interface = iface -def htonl(x): - """Convert 32-bit positive integers from host to network byte order.""" +def htonl(x: int) -> int: + """ + Convert 32-bit positive integer from host to network byte order. + + :param int x: 32-bit positive integer from host. + + :return int: 32-bit positive integer in network byte order. + """ return ( ((x) << 24 & 0xFF000000) | ((x) << 8 & 0x00FF0000) @@ -36,8 +58,14 @@ def htonl(x): ) -def htons(x): - """Convert 16-bit positive integers from host to network byte order.""" +def htons(x: int) -> int: + """ + Convert 16-bit positive integer from host to network byte order. + + :param int x: 16-bit positive integer from host. + + :return int: 16-bit positive integer in network byte order. + """ return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF) @@ -49,10 +77,28 @@ def htons(x): # pylint: disable=too-many-arguments, unused-argument -def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): - """Translate the host/port argument into a sequence of 5-tuples that - contain all the necessary arguments for creating a socket connected to that service. - +def getaddrinfo( + host: str, + port: int, + family: int = 0, + socktype: int = 0, + proto: int = 0, + flags: int = 0, +) -> List[Tuple[int, int, int, str, Tuple[str, int]]]: + """ + Translate the host/port argument into a sequence of 5-tuples that contain all the necessary + arguments for creating a socket connected to that service. + + :param str host: a domain name, a string representation of an IPv4/v6 address or + None. + :param int port: Port number to connect to (0 - 65536). + :param int family: Ignored and hardcoded as 0x03 (the only family implemented) by the function. + :param int socktype: The type of socket, either SOCK_STREAM (0x21) for TCP or SOCK_DGRAM (0x02) + for UDP, defaults to 0x00. + :param int proto: Unused in this implementation of socket. + :param int flags: Unused in this implementation of socket. + + :return List[Tuple[int, int, int, str, Tuple[str, int]]]: Address info entries. """ if not isinstance(port, int): raise RuntimeError("Port must be an integer") @@ -61,21 +107,26 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))] -def gethostbyname(hostname): - """Translate a host name to IPv4 address format. The IPv4 address - is returned as a string. +def gethostbyname(hostname: str) -> str: + """ + Lookup a host name's IPv4 address. + + :param str hostname: Hostname to lookup. - :param str hostname: Desired hostname. + :return str: IPv4 address (a string of the form '255.255.255.255'). """ addr = _the_interface.get_host_by_name(hostname) addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3]) return addr -def is_ipv4(host): - """Checks if a host string is an IPv4 address. +def is_ipv4(host: str) -> bool: + """ + Check if a hostname is an IPv4 address (a string of the form '255.255.255.255'). + + :param str host: Hostname to check. - :param str host: host's name or ip + :return bool: """ octets = host.split(".", 3) if len(octets) != 4 or not "".join(octets).isdigit(): @@ -88,17 +139,28 @@ def is_ipv4(host): # pylint: disable=invalid-name, too-many-public-methods class socket: - """A simplified implementation of the Python 'socket' class - for connecting to a Wiznet5k module. - - :param int family: Socket address (and protocol) family. - :param int type: Socket type. + """ + A simplified implementation of the Python 'socket' class for connecting + to a Wiznet5k module. """ # pylint: disable=redefined-builtin,unused-argument def __init__( - self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None - ): + self, + family: int = AF_INET, + type: int = SOCK_STREAM, + proto: int = 0, + fileno: Optional[int] = None, + socknum: Optional[int] = None, + ) -> None: + """ + :param int family: Socket address (and protocol) family, defaults to AF_INET. + :param int type: Socket type, use SOCK_STREAM for TCP and SOCK_DGRAM for UDP, + defaults to SOCK_STREAM. + :param int proto: Unused, retained for compatibility. + :param Optional[int] fileno: Unused, retained for compatibility. + :param Optional[int] socknum: Unused, retained for compatibility. + """ if family != AF_INET: raise RuntimeError("Only AF_INET family supported by W5K modules.") self._sock_type = type @@ -113,7 +175,7 @@ def __init__( def __enter__(self): return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb) -> None: if self._sock_type == SOCK_STREAM: self.disconnect() stamp = time.monotonic() @@ -127,18 +189,30 @@ def __exit__(self, exc_type, exc_val, exc_tb): raise RuntimeError("Failed to close socket") @property - def socknum(self): - """Returns the socket object's socket number.""" + def socknum(self) -> int: + """ + Return the socket object's socket number. + + :return int: Socket number. + """ return self._socknum @property - def status(self): - """Returns the status of the socket""" + def status(self) -> int: + """ + Return the status of the socket. + + :return int: Status of the socket. + """ return _the_interface.socket_status(self.socknum)[0] @property - def connected(self): - """Returns whether or not we are connected to the socket.""" + def connected(self) -> bool: + """ + Return whether connected to the socket. + + :return bool: Whether connected. + """ if self.socknum >= _the_interface.max_sockets: return False status = _the_interface.socket_status(self.socknum)[0] @@ -158,25 +232,36 @@ def connected(self): self.close() return result - def getpeername(self): - """Return the remote address to which the socket is connected.""" + def getpeername(self) -> Union[str, bytearray]: + """ + Return the remote address to which the socket is connected. + + :return Union[str, bytearray]: An IPv4 address (a string of the form '255.255.255.255'). + An error may return a bytearray. + """ return _the_interface.remote_ip(self.socknum) - def inet_aton(self, ip_string): - """Convert an IPv4 address from dotted-quad string format. + def inet_aton(self, ip_string: str) -> bytearray: + """ + Convert an IPv4 address from dotted-quad string format. + + :param str ip_string: IPv4 address (a string of the form '255.255.255.255'). - :param str ip_string: IP Address, as a dotted-quad string. + :return bytearray: IPv4 address as a 4 byte bytearray. """ self._buffer = b"" self._buffer = [int(item) for item in ip_string.split(".")] self._buffer = bytearray(self._buffer) return self._buffer - def bind(self, address): - """Bind the socket to the listen port, if host is specified the interface - will be reconfigured to that IP. + def bind(self, address: Tuple[Optional[str], int]) -> None: + """Bind the socket to the listen port. + + If the host is specified the interface will be reconfigured to that IP address. - :param tuple address: local socket as a (host, port) tuple. + :param Tuple[Optional[str], int] address: Address as a (host, port) tuple. The host + may be an IPv4 address (a string of the form '255.255.255.255'), or None. + The port number is in the range (0 - 65536). """ if address[0] is not None: ip_address = _the_interface.unpretty_ip(address[0]) @@ -192,20 +277,32 @@ def bind(self, address): ) self._buffer = b"" - def listen(self, backlog=None): - """Listen on the port specified by bind. + def listen(self, backlog: Optional[int] = None) -> None: + """ + Listen on the port specified by bind. - :param backlog: For compatibility but ignored. + :param Optional[int] backlog: Included for compatibility but ignored. """ assert self._listen_port is not None, "Use bind to set the port before listen!" _the_interface.socket_listen(self.socknum, self._listen_port) self._buffer = b"" - def accept(self): - """Accept a connection. The socket must be bound to an address and listening for - connections. The return value is a pair (conn, address) where conn is a new - socket object usable to send and receive data on the connection, and address is + def accept( + self, + ) -> Optional[Tuple[socket, Tuple[Union[str, bytearray], Union[int, bytearray]],]]: + # wiznet5k.adafruit_wiznet5k_socket.socket, + """ + Accept a connection. + + The socket must be bound to an address and listening for connections. + + The return value is a pair (conn, address) where conn is a new + socket object to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. + + :return Optional[Tuple[socket.socket, Tuple[Union[str, bytearray], Union[int, bytearray]]]: + If successful (socket object, (IP address, port)). If errors occur, the IP address + and / or the port may be returned as bytearrays. """ stamp = time.monotonic() while self.status not in ( @@ -220,7 +317,7 @@ def accept(self): new_listen_socknum, addr = _the_interface.socket_accept(self.socknum) current_socknum = self.socknum - # Create a new socket object and swap socket nums so we can continue listening + # Create a new socket object and swap socket nums, so we can continue listening client_sock = socket() client_sock._socknum = current_socknum # pylint: disable=protected-access self._socknum = new_listen_socknum # pylint: disable=protected-access @@ -230,10 +327,18 @@ def accept(self): raise RuntimeError("Failed to open new listening socket") return client_sock, addr - def connect(self, address, conntype=None): - """Connect to a remote socket at address. + def connect( + self, + address: Tuple[Union[str, Tuple[int, int, int, int]], int], + conntype: Optional[int] = None, + ) -> None: + """ + Connect to a remote socket. - :param tuple address: Remote socket as a (host, port) tuple. + :param Tuple[Union[str, Tuple[int, int, int, int]], int] address: Remote socket as + a (host, port) tuple. The host may be a tuple in the form (0, 0, 0, 0) or a string. + :param Optional[int] conntype: Raises an exception if set to 3, unused otherwise, defaults + to None. """ assert ( conntype != 0x03 @@ -255,30 +360,40 @@ def connect(self, address, conntype=None): raise RuntimeError("Failed to connect to host", host) self._buffer = b"" - def send(self, data): - """Send data to the socket. The socket must be connected to - a remote socket. + def send(self, data: Union[bytes, bytearray]) -> None: + """ + Send data to the socket. + + The socket must be connected to a remote socket. - :param bytearray data: Desired data to send to the socket. + :param bytearray data: Data to send to the socket. """ _the_interface.socket_write(self.socknum, data, self._timeout) gc.collect() - def sendto(self, data, address): - """Send data to the socket. The socket must be connected to - a remote socket. + def sendto(self, data: bytearray, address: [Tuple[str, int]]) -> None: + """ + Connect to a remote socket and send data. - :param bytearray data: Desired data to send to the socket. + :param bytearray data: Data to send to the socket. :param tuple address: Remote socket as a (host, port) tuple. """ self.connect(address) return self.send(data) - def recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches - """Reads some bytes from the connected remote address. + def recv( + # pylint: disable=too-many-branches + self, + bufsize: int = 0, + flags: int = 0, + ) -> bytes: + """ + Read from the connected remote address. :param int bufsize: Maximum number of bytes to receive. :param int flags: ignored, present for compatibility. + + :return bytes: Data from the remote address. """ if self.status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED: return b"" @@ -335,10 +450,17 @@ def recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches gc.collect() return ret - def embed_recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches - """Reads some bytes from the connected remote address and then return recv(). - :param int bufsize: Maximum number of bytes to receive. + def embed_recv( + self, bufsize: int = 0, flags: int = 0 + ) -> bytes: # pylint: disable=too-many-branches + """ + Read from the connected remote address. + + :param int bufsize: Maximum number of bytes to receive, ignored by the + function, defaults to 0. :param int flags: ignored, present for compatibility. + + :return bytes: All data available from the connection. """ # print("Socket read", bufsize) ret = None @@ -355,12 +477,17 @@ def embed_recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches gc.collect() return ret - def recvfrom(self, bufsize=0, flags=0): - """Reads some bytes from the connected remote address. + def recvfrom( + self, bufsize: int = 0, flags: int = 0 + ) -> Tuple[bytes, Tuple[str, int]]: + """ + Read some bytes from the connected remote address. :param int bufsize: Maximum number of bytes to receive. :param int flags: ignored, present for compatibility. - :return: a tuple (bytes, address) where address is a tuple (ip, port) + + :return Tuple[bytes, Tuple[str, int]]: a tuple (bytes, address) + where address is a tuple (ip, port) """ return ( self.recv(bufsize), @@ -370,13 +497,15 @@ def recvfrom(self, bufsize=0, flags=0): ), ) - def recv_into(self, buf, nbytes=0, flags=0): - """Reads some bytes from the connected remote address info the provided buffer. + def recv_into(self, buf: bytearray, nbytes: int = 0, flags: int = 0) -> int: + """ + Read from the connected remote address into the provided buffer. :param bytearray buf: Data buffer :param nbytes: Maximum number of bytes to receive :param int flags: ignored, present for compatibility. - :return: the number of bytes received + + :return int: the number of bytes received """ if nbytes == 0: nbytes = len(buf) @@ -385,13 +514,18 @@ def recv_into(self, buf, nbytes=0, flags=0): buf[:nbytes] = ret return nbytes - def recvfrom_into(self, buf, nbytes=0, flags=0): - """Reads some bytes from the connected remote address info the provided buffer. + def recvfrom_into( + self, buf: bytearray, nbytes: int = 0, flags: int = 0 + ) -> Tuple[int, Tuple[str, int]]: + """ + Read some bytes from the connected remote address into the provided buffer. - :param bytearray buf: Data buffer - :param nbytes: Maximum number of bytes to receive - :param int flags: ignored, present for compatibility. - :return: a tuple (nbytes, address) where address is a tuple (ip, port) + :param bytearray buf: Data buffer. + :param int nbytes: Maximum number of bytes to receive. + :param int flags: Unused, present for compatibility. + + :return Tuple[int, Tuple[str, int]]: A tuple (nbytes, address) where address is a + tuple (ip, port) """ return ( self.recv_into(buf, nbytes), @@ -401,9 +535,14 @@ def recvfrom_into(self, buf, nbytes=0, flags=0): ), ) - def readline(self): - """Attempt to return as many bytes as we can up to \ - but not including '\r\n'. + def readline(self) -> bytes: + """ + Read a line from the socket. + + Attempt to return as many bytes as we can up to but not including a carriage return and + linefeed character pair. + + :return bytes: The data read from the socket. """ stamp = time.monotonic() while b"\r\n" not in self._buffer: @@ -424,30 +563,37 @@ def readline(self): gc.collect() return firstline - def disconnect(self): - """Disconnects a TCP socket.""" + def disconnect(self) -> None: + """Disconnect a TCP socket.""" assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket." _the_interface.socket_disconnect(self.socknum) - def close(self): - """Closes the socket.""" + def close(self) -> None: + """Close the socket.""" _the_interface.socket_close(self.socknum) - def available(self): - """Returns how many bytes of data are available to be read from the socket.""" + def available(self) -> int: + """ + Return how many bytes of data are available to be read from the socket. + + :return int: Number of bytes available. + """ return _the_interface.socket_available(self.socknum, self._sock_type) - def settimeout(self, value): - """Sets socket read timeout. + def settimeout(self, value: float) -> None: + """ + Set the socket read timeout. - :param int value: Socket read timeout, in seconds. + :param float value: Socket read timeout in seconds. """ if value < 0: raise Exception("Timeout period should be non-negative.") self._timeout = value - def gettimeout(self): - """Return the timeout in seconds (float) associated - with socket operations, or None if no timeout is set. + def gettimeout(self) -> Optional[float]: + """ + Timeout associated with socket operations. + + :return Optional[float]: Timeout in seconds, or None if no timeout is set. """ return self._timeout diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_wsgiserver.py b/adafruit_wiznet5k/adafruit_wiznet5k_wsgiserver.py index 46088bb..fd237a7 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_wsgiserver.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_wsgiserver.py @@ -27,6 +27,15 @@ * Author(s): Matt Costi, Patrick Van Oosterwijck """ # pylint: disable=no-name-in-module +from __future__ import annotations + +try: + from typing import TYPE_CHECKING, Optional, List, Tuple, Dict + + if TYPE_CHECKING: + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +except ImportError: + pass import io import gc @@ -34,11 +43,15 @@ import adafruit_wiznet5k as wiznet5k import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket -_the_interface = None # pylint: disable=invalid-name +_the_interface: Optional[WIZNET5K] = None # pylint: disable=invalid-name -def set_interface(iface): - """Helper to set the global internet interface""" +def set_interface(iface: WIZNET5K) -> None: + """ + Helper to set the global internet interface. + + :param wiznet5k.adafruit_wiznet5k.WIZNET5K: Ethernet interface. + """ global _the_interface # pylint: disable=global-statement, invalid-name _the_interface = iface socket.set_interface(iface) @@ -46,11 +59,19 @@ def set_interface(iface): # pylint: disable=invalid-name class WSGIServer: - """ - A simple server that implements the WSGI interface - """ - - def __init__(self, port=80, debug=False, application=None): + """A simple server that implements the WSGI interface.""" + + def __init__( + self, + port: int = 80, + debug: bool = False, + application: Optional[callable] = None, + ) -> None: + """ + :param int port: WSGI server port, defaults to 80. + :param bool debug: Enable debugging, defaults to False. + :param Optional[callable] application: Application to call in response to a HTTP request. + """ self.application = application self.port = port self._timeout = 20 @@ -66,9 +87,10 @@ def __init__(self, port=80, debug=False, application=None): if self._debug: print("Max sockets: ", self.MAX_SOCK_NUM) - def start(self): + def start(self) -> None: """ - Starts the server and begins listening for incoming connections. + Start the server and listen for incoming connections. + Call update_poll in the main loop for the application callable to be invoked on receiving an incoming request. """ @@ -82,8 +104,10 @@ def start(self): ip = _the_interface.pretty_ip(_the_interface.ip_address) print("Server available at {0}:{1}".format(ip, self.port)) - def update_poll(self): + def update_poll(self) -> None: """ + Check for new incoming client requests. + Call this method inside your main event loop to get the server check for new incoming client requests. When a request comes in, the application callable will be invoked. @@ -108,14 +132,14 @@ def update_poll(self): except RuntimeError: pass - def finish_response(self, result, client): + def finish_response(self, result: str, client: socket.socket) -> None: """ Called after the application callable returns result data to respond with. Creates the HTTP Response payload from the response_headers and results data, and sends it back to client. - :param string result: the data string to send back in the response to the client. - :param Socket client: the socket to send the response to. + :param str result: the data string to send back in the response to the client. + :param socket.socket client: the socket to send the response to. """ try: response = "HTTP/1.1 {0}\r\n".format(self._response_status) @@ -140,25 +164,29 @@ def finish_response(self, result, client): client.disconnect() client.close() - def _start_response(self, status, response_headers): + def _start_response( + self, status: str, response_headers: List[Tuple[str, str]] + ) -> None: """ The application callable will be given this method as the second param This is to be called before the application callable returns, to signify the response can be started with the given status and headers. - :param string status: a status string including the code and reason. ex: "200 OK" - :param list response_headers: a list of tuples to represent the headers. + :param str status: a status string including the code and reason. ex: "200 OK" + :param List[Tuple[str, str]] response_headers: a list of tuples to represent the headers. ex ("header-name", "header value") """ self._response_status = status self._response_headers = [("Server", "w5kWSGIServer")] + response_headers - def _get_environ(self, client): + def _get_environ(self, client: socket.socket) -> Dict: """ The application callable will be given the resulting environ dictionary. It contains metadata about the incoming request and the request body ("wsgi.input") - :param Socket client: socket to read the request from + :param socket.socket client: Socket to read the request from. + + :return Dict: Data for the application callable. """ env = {} line = str(client.readline(), "utf-8")