diff --git a/adafruit_wiznet5k/adafruit_wiznet5k.py b/adafruit_wiznet5k/adafruit_wiznet5k.py index 7beb62e..278b886 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k.py @@ -5,6 +5,7 @@ # SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries # SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck # SPDX-FileCopyrightText: 2021 Adam Cummick +# SPDX-FileCopyrightText: 2023 Martin Stephens # # SPDX-License-Identifier: MIT """ @@ -14,7 +15,7 @@ Pure-Python interface for WIZNET 5k ethernet modules. * Author(s): WIZnet, Arduino LLC, Bjoern Hartmann, Paul Stoffregen, Brent Rubell, - Patrick Van Oosterwijck + Patrick Van Oosterwijck, Martin Stephens Implementation Notes -------------------- @@ -31,12 +32,15 @@ from __future__ import annotations try: - from typing import TYPE_CHECKING, Optional, Union, List, Tuple, Sequence + from typing import TYPE_CHECKING, Optional, Union, Tuple if TYPE_CHECKING: from circuitpython_typing import WriteableBuffer import busio import digitalio + + IpAddress4Raw = Union[bytes, Tuple[int, int, int, int]] + MacAddressRaw = Union[bytes, Tuple[int, int, int, int, int, int]] except ImportError: pass @@ -53,33 +57,62 @@ import adafruit_wiznet5k.adafruit_wiznet5k_dns as dns from adafruit_wiznet5k.adafruit_wiznet5k_debug import debug_msg -# Wiznet5k Registers -_REG_MR = const(0x0000) # Mode -_REG_GAR = const(0x0001) # Gateway IP Address -_REG_SUBR = const(0x0005) # Subnet Mask Address -_REG_VERSIONR_W5500 = const(0x0039) # W5500 Silicon Version -_REG_VERSIONR_W5100S = const(0x0080) # W5100S Silicon Version -_REG_SHAR = const(0x0009) # Source Hardware Address -_REG_SIPR = const(0x000F) # Source IP Address -_REG_PHYCFGR = const(0x002E) # W5500 PHY Configuration -_REG_PHYCFGR_W5100S = const(0x003C) # W5100S PHY Configuration -_REG_RCR_5100s = const(0x0019) # Retry Count -_REG_RTR_5100s = const(0x0017) # Retry Time -_REG_RCR_5500 = const(0x001B) # Retry Count -_REG_RTR_5500 = const(0x0019) # Retry Time - -# Wiznet5k Socket Registers -_REG_SNMR = const(0x0000) # Socket n Mode -_REG_SNCR = const(0x0001) # Socket n Command -_REG_SNIR = const(0x0002) # Socket n Interrupt -_REG_SNSR = const(0x0003) # Socket n Status -_REG_SNPORT = const(0x0004) # Socket n Source Port -_REG_SNDIPR = const(0x000C) # Destination IP Address -_REG_SNDPORT = const(0x0010) # Destination Port -_REG_SNRX_RSR = const(0x0026) # RX Free Size -_REG_SNRX_RD = const(0x0028) # Read Size Pointer -_REG_SNTX_FSR = const(0x0020) # Socket n TX Free Size -_REG_SNTX_WR = const(0x0024) # TX Write Pointer +# *** Wiznet Common Registers *** +_REG_MR = {"w5100s": const(0x0000), "w5500": const(0x0000)} +# Gateway IPv4 Address. +_REG_GAR = {"w5100s": const(0x0001), "w5500": const(0x0001), "w6100": const(0x4130)} +# Subnet Mask Address +_REG_SUBR = {"w5100s": const(0x0005), "w5500": const(0x0005), "w6100": const(0x4134)} +# Chip version. +_REG_VERSIONR = { + "w5100s": const(0x0080), + "w5500": const(0x0039), + "w6100": const(0x0000), +} +# Source Hardware Address +_REG_SHAR = {"w5100s": const(0x0009), "w5500": const(0x0009), "w6100": const(0x4120)} +# Source IP Address +_REG_SIPR = {"w5100s": const(0x000F), "w5500": const(0x000F), "w6100": const(0x4138)} +# Register with link status flag (PHYCFGR for 5xxxx, PHYSR for 6100). +_REG_LINK_FLAG = { + "w5100s": const(0x003C), + "w5500": const(0x002E), + "w6100": const(0x3000), +} +_REG_RCR = {"w5100s": const(0x0019), "w5500": const(0x001B), "w6100": const(0x4204)} +_REG_RTR = {"w5100s": const(0x0017), "w5500": const(0x0019), "w6100": const(0x4200)} + +# *** Wiznet Socket Registers *** +# Socket n Mode. +_REG_SNMR = const(0x0000) +# Socket n Command. +_REG_SNCR = {"w5100s": const(0x0001), "w5500": const(0x0001), "w6100": const(0x0010)} +# Socket n Interrupt. +_REG_SNIR = {"w5100s": const(0x0002), "w5500": const(0x0002), "w6100": const(0x0020)} +# Socket n Status. +_REG_SNSR = {"w5100s": const(0x0003), "w5500": const(0x0003), "w6100": const(0x0030)} +# Socket n Source Port. +_REG_SNPORT = {"w5100s": const(0x0004), "w5500": const(0x0004), "w6100": const(0x0114)} +# Destination IPv4 Address. +_REG_SNDIPR = {"w5100s": const(0x000C), "w5500": const(0x000C), "w6100": const(0x0120)} +# Destination Port. +_REG_SNDPORT = {"w5100s": const(0x0010), "w5500": const(0x0010), "w6100": const(0x0140)} +# RX Free Size. +_REG_SNRX_RSR = { + "w5100s": const(0x0026), + "w5500": const(0x0026), + "w6100": const(0x0224), +} +# Read Size Pointer. +_REG_SNRX_RD = {"w5100s": const(0x0028), "w5500": const(0x0028), "w6100": const(0x0228)} +# Socket n TX Free Size. +_REG_SNTX_FSR = { + "w5100s": const(0x0020), + "w5500": const(0x0020), + "w6100": const(0x0204), +} +# TX Write Pointer. +_REG_SNTX_WR = {"w5100s": const(0x0024), "w5500": const(0x0024), "w6100": const(0x020C)} # SNSR Commands SNSR_SOCK_CLOSED = const(0x00) @@ -123,7 +156,7 @@ _MR_RST = const(0x80) # Mode Register RST # Socket mode register _SNMR_CLOSE = const(0x00) -_SNMR_TCP = const(0x21) +SNMR_TCP = const(0x21) SNMR_UDP = const(0x02) _SNMR_IPRAW = const(0x03) _SNMR_MACRAW = const(0x04) @@ -135,12 +168,16 @@ _DEFAULT_MAC = (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED) # Maximum number of sockets to support, differs between chip versions. -_W5200_W5500_MAX_SOCK_NUM = const(0x08) -_W5100_MAX_SOCK_NUM = const(0x04) -_SOCKET_INVALID = const(255) +_MAX_SOCK_NUM = {"w5100s": const(0x04), "w5500": const(0x08), "w6100": const(0x08)} +_SOCKET_INVALID = const(0xFF) + -# Source ports in use -_SRC_PORTS = [0] * _W5200_W5500_MAX_SOCK_NUM +def _unprettyfy(data: str, seperator: str, correct_length: int) -> bytes: + """Helper for converting . or : delimited strings to bytes objects.""" + data = bytes(int(x) for x in data.split(seperator)) + if len(data) == correct_length: + return data + raise ValueError("Invalid IP or MAC address.") class WIZNET5K: # pylint: disable=too-many-public-methods, too-many-instance-attributes @@ -159,7 +196,7 @@ def __init__( cs: digitalio.DigitalInOut, # pylint: disable=invalid-name reset: Optional[digitalio.DigitalInOut] = None, is_dhcp: bool = True, - mac: Union[List[int], Tuple[int]] = _DEFAULT_MAC, + mac: Union[MacAddressRaw, str] = _DEFAULT_MAC, hostname: Optional[str] = None, debug: bool = False, ) -> None: @@ -168,7 +205,7 @@ def __init__( :param digitalio.DigitalInOut cs: Chip select pin. :param digitalio.DigitalInOut reset: Optional reset pin, defaults to None. :param bool is_dhcp: Whether to start DHCP automatically or not, defaults to True. - :param Union[List[int], Tuple[int]] mac: The Wiznet's MAC Address, defaults to + :param Union[MacAddressRaw, str] mac: The Wiznet's MAC Address, defaults to (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED). :param str hostname: The desired hostname, with optional {} to fill in the MAC address, defaults to None. @@ -180,57 +217,49 @@ def __init__( # init c.s. self._cs = cs - # reset wiznet module prior to initialization + # Reset wiznet module prior to initialization. if reset: reset.value = False time.sleep(0.1) reset.value = True time.sleep(0.1) + # Setup chip_select pin. + time.sleep(1) + self._cs.switch_to_output() + self._cs.value = 1 + # Buffer for reading params from module self._pbuff = bytearray(8) self._rxbuf = bytearray(_MAX_PACKET) # attempt to initialize the module self._ch_base_msb = 0 - if self._w5xxx_init() != 1: - raise RuntimeError("Failed to initialize WIZnet module.") - if self._chip_type == "w5100s": - WIZNET5K._sockets_reserved = [False] * (_W5100_MAX_SOCK_NUM - 1) - elif self._chip_type == "w5500": - WIZNET5K._sockets_reserved = [False] * (_W5200_W5500_MAX_SOCK_NUM - 1) - else: - raise RuntimeError("Unrecognized chip type.") + self._src_ports_in_use = [] + self._wiznet_chip_init() # Set MAC address self.mac_address = mac self.src_port = 0 - self._dns = (0, 0, 0, 0) + self._dns = b"\x00\x00\x00\x00" # udp related - self.udp_datasize = [0] * self.max_sockets self.udp_from_ip = [b"\x00\x00\x00\x00"] * self.max_sockets self.udp_from_port = [0] * self.max_sockets - # First, wait link status is on - # to avoid the code during DHCP, socket listen, connect ... - # assert self.link_status, "Ethernet cable disconnected!" - start_time = time.monotonic() - while True: - if self.link_status or ((time.monotonic() - start_time) > 5): + # Wait to give the Ethernet link to initialise. + stop_time = time.monotonic() + 5 + while time.monotonic() < stop_time: + if self.link_status: break - time.sleep(1) - debug_msg("My Link is: {}".format(self.link_status), self._debug) + debug_msg("Ethernet link is down…", self._debug) + time.sleep(0.5) self._dhcp_client = None # Set DHCP if is_dhcp: - ret = self.set_dhcp(hostname) - if ret != 0: - self._dhcp_client = None - if ret != 0: - raise RuntimeError("Failed to configure DHCP Server!") + self.set_dhcp(hostname) - def set_dhcp(self, hostname: Optional[str] = None) -> int: + def set_dhcp(self, hostname: Optional[str] = None) -> None: """ Initialize the DHCP client and attempt to retrieve and set network configuration from the DHCP server. @@ -238,24 +267,23 @@ def set_dhcp(self, hostname: Optional[str] = None) -> int: :param Optional[str] hostname: The desired hostname for the DHCP server with optional {} to fill in the MAC address, defaults to None. - :return int: 0 if DHCP configured, -1 otherwise. + :raises RuntimeError: If DHCP lease cannot be established. """ debug_msg("* Initializing DHCP", self._debug) - # Return IP assigned by DHCP self._dhcp_client = dhcp.DHCP(self, self.mac_address, hostname, self._debug) - ret = self._dhcp_client.request_dhcp_lease() - if ret == 1: + if self._dhcp_client.request_dhcp_lease(): debug_msg( "Found DHCP Server:\nIP: {}\n Subnet Mask: {}\n GW Addr: {}" "\n DNS Server: {}".format(*self.ifconfig), self._debug, ) - return 0 - return -1 + else: + self._dhcp_client = None + raise RuntimeError("Failed to configure DHCP Server!") def maintain_dhcp_lease(self) -> None: """Maintain the DHCP lease.""" - if self._dhcp_client is not None: + if self._dhcp_client: self._dhcp_client.maintain_dhcp_lease() def get_host_by_name(self, hostname: str) -> bytes: @@ -264,7 +292,9 @@ def get_host_by_name(self, hostname: str) -> bytes: :param str hostname: The host name to be converted. - :return Union[int, bytes]: a 4 bytearray. + :return bytes: The IPv4 address as a 4 byte array. + + :raises RuntimeError: If the DNS lookup fails. """ debug_msg("Get host by name", self._debug) if isinstance(hostname, str): @@ -273,11 +303,11 @@ def get_host_by_name(self, hostname: str) -> bytes: _dns_client = dns.DNS( self, self.pretty_ip(bytearray(self._dns)), debug=self._debug ) - ret = _dns_client.gethostbyname(hostname) - debug_msg("* Resolved IP: {}".format(ret), self._debug) - if ret == -1: + ipv4 = _dns_client.gethostbyname(hostname) + debug_msg("* Resolved IP: {}".format(ipv4), self._debug) + if ipv4 == -1: raise RuntimeError("Failed to resolve hostname!") - return ret + return ipv4 @property def max_sockets(self) -> int: @@ -286,11 +316,7 @@ def max_sockets(self) -> int: :return int: Maximum supported sockets. """ - if self._chip_type == "w5500": - return _W5200_W5500_MAX_SOCK_NUM - if self._chip_type == "w5100s": - return _W5100_MAX_SOCK_NUM - return -1 + return _MAX_SOCK_NUM[self._chip_type] @property def chip(self) -> str: @@ -302,346 +328,182 @@ def chip(self) -> str: return self._chip_type @property - def ip_address(self) -> bytearray: + def ip_address(self) -> bytes: """ - Configured IP address. + Configured IP address for the WIZnet Ethernet hardware. - :return bytearray: IP address as four bytes. + :return bytes: IP address as four bytes. """ - return self.read(_REG_SIPR, 0x00, 4) + return self._read(_REG_SIPR[self._chip_type], 0x00, 4) - def pretty_ip( - self, - # pylint: disable=no-self-use, invalid-name - ip: bytearray, - ) -> str: + @staticmethod + def pretty_ip(ipv4: bytes) -> str: """ Convert a 4 byte IP address to a dotted-quad string for printing. - :param bytearray ip: A four byte IP address. + :param bytearray ipv4: A four byte IP address. :return str: The IP address (a string of the form '255.255.255.255'). + + :raises ValueError: If IP address is not 4 bytes. """ - return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3]) + if len(ipv4) != 4: + raise ValueError("Wrong length for IPv4 address.") + return ".".join(f"{byte}" for byte in ipv4) - def unpretty_ip( - self, - # pylint: disable=no-self-use, invalid-name - ip: str, - ) -> bytes: + @staticmethod + def unpretty_ip(ipv4: str) -> bytes: """ Convert a dotted-quad string to a four byte IP address. - :param str ip: IP address (a string of the form '255.255.255.255') to be converted. + :param str ipv4: IPv4 address (a string of the form '255.255.255.255') to be converted. + + :return bytes: IPv4 address in four bytes. - :return bytes: IP address in four bytes. + :raises ValueError: If IPv4 address is not 4 bytes. """ - octets = [int(x) for x in ip.split(".")] - return bytes(octets) + return _unprettyfy(ipv4, ".", 4) @property def mac_address(self) -> bytes: """ - Ethernet hardware's MAC address. + The WIZnet Ethernet hardware MAC address. - :return bytearray: Six byte MAC address.""" - return bytes(self.read(_REG_SHAR, 0x00, 6)) + :return bytes: Six byte MAC address. + """ + return self._read(_REG_SHAR[self._chip_type], 0x00, 6) @mac_address.setter - def mac_address(self, address: Tuple[int]) -> None: + def mac_address(self, address: Union[MacAddressRaw, str]) -> None: """ - Set the hardware MAC address. + Set the WIZnet hardware MAC address. - :param Tuple address: A 6 byte hardware MAC address. + :param Union[MacAddressRaw, str] address: A hardware MAC address. - :raises ValueError: If the MAC address in invalid + :raises ValueError: If the MAC address is invalid """ - # Check that the MAC is a valid 6 byte address. - if len(address) == 6 and False not in [ - (isinstance(x, int) and 0 <= x <= 255) for x in address - ]: - self.write(_REG_SHAR, 0x04, address) - else: + try: + address = [int(x, 16) for x in address.split(":")] + except AttributeError: + pass + try: + if len(address) != 6: + raise ValueError() + # Bytes conversion will raise ValueError if values are not 0-255 + self._write(_REG_SHAR[self._chip_type], 0x04, bytes(address)) + except ValueError: + # pylint: disable=raise-missing-from raise ValueError("Invalid MAC address.") - def pretty_mac( - self, - # pylint: disable=no-self-use, invalid-name - mac: bytearray, - ) -> str: + @staticmethod + def pretty_mac(mac: bytes) -> str: """ - Convert a bytearray MAC address to a ':' seperated string for display. + Convert a bytes MAC address to a ':' seperated string for display. - :param bytearray mac: The MAC address. + :param bytes mac: The MAC address. :return str: Mac Address in the form 00:00:00:00:00:00 + + :raises ValueError: If MAC address is not 6 bytes. """ - return "%s:%s:%s:%s:%s:%s" % ( - hex(mac[0]), - hex(mac[1]), - hex(mac[2]), - hex(mac[3]), - hex(mac[4]), - hex(mac[5]), - ) + if len(mac) != 6: + raise ValueError("Incorrect length for MAC address.") + return ":".join(f"{byte:02x}" for byte in mac) - def remote_ip(self, socket_num: int) -> Union[str, bytearray]: + def remote_ip(self, socket_num: int) -> str: """ IP address of the host which sent the current incoming packet. :param int socket_num: ID number of the socket to check. - :return Union[str, bytearray]: A four byte IP address. + :return str: The IPv4 address. + + :raises ValueError: If the socket number is out of range. """ - if socket_num >= self.max_sockets: - return self._pbuff - for octet in range(0, 4): - self._pbuff[octet] = self._read_socket(socket_num, _REG_SNDIPR + octet)[0] - return self.pretty_ip(self._pbuff) + self._sock_num_in_range(socket_num) + for octet in range(4): + self._pbuff[octet] = self._read_socket_register( + socket_num, _REG_SNDIPR[self._chip_type] + octet + ) + return self.pretty_ip(self._pbuff[:4]) - @property - def link_status(self) -> int: - """Physical hardware (PHY) connection status. + def remote_port(self, socket_num: int) -> int: + """ + Port number of the host which sent the current incoming packet. - :return int: 1 if the link is up, 0 if the link is down. + :param int socket_num: ID number of the socket to check. + + :return int: The incoming port number of the socket connection. + + :raises ValueError: If the socket number is out of range. """ - if self._chip_type == "w5500": - data = self.read(_REG_PHYCFGR, 0x00) - return data[0] & 0x01 - if self._chip_type == "w5100s": - data = self.read(_REG_PHYCFGR_W5100S, 0x00) - return data[0] & 0x01 - return 0 + self._sock_num_in_range(socket_num) + return self._read_two_byte_sock_reg(socket_num, _REG_SNDPORT[self._chip_type]) - def remote_port(self, socket_num: int) -> Union[int, bytearray]: + @property + def link_status(self) -> bool: """ - Port of the host which sent the current incoming packet. + Physical hardware (PHY) connection status. - :param int socket_num: ID number of the socket to check. + Whether the WIZnet hardware is physically connected to an Ethernet network. - :return Union[int, bytearray]: The port number of the socket connection. + :return bool: True if the link is up, False if the link is down. """ - if socket_num >= self.max_sockets: - return self._pbuff - for octet in range(2): - self._pbuff[octet] = self._read_socket(socket_num, _REG_SNDPORT + octet)[0] - return int((self._pbuff[0] << 8) | self._pbuff[0]) + return bool( + int.from_bytes(self._read(_REG_LINK_FLAG[self._chip_type], 0x00), "big") + & 0x01 + ) @property - def ifconfig( - self, - ) -> Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: + def ifconfig(self) -> Tuple[bytes, bytes, bytes, bytes]: """ Network configuration information. - :return Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: \ - The IP address, subnet mask, gateway address and DNS server address.""" + :return Tuple[bytes, bytes, bytes, bytes]: The IP address, subnet mask, gateway + address and DNS server address. + """ return ( self.ip_address, - self.read(_REG_SUBR, 0x00, 4), - self.read(_REG_GAR, 0x00, 4), + self._read(_REG_SUBR[self._chip_type], 0x00, 4), + self._read(_REG_GAR[self._chip_type], 0x00, 4), self._dns, ) @ifconfig.setter def ifconfig( - self, params: Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]] + self, params: Tuple[IpAddress4Raw, IpAddress4Raw, IpAddress4Raw, IpAddress4Raw] ) -> None: """ Set network configuration. - :param Tuple[bytearray, bytearray, bytearray, Tuple[int, int, int, int]]: - Configuration settings - (ip_address, subnet_mask, gateway_address, dns_server). + :param Tuple[Address4Bytes, Address4Bytes, Address4Bytes, Address4Bytes]: Configuration + settings - (ip_address, subnet_mask, gateway_address, dns_server). """ + for param in params: + if len(param) != 4: + raise ValueError("IPv4 address must be 4 bytes.") ip_address, subnet_mask, gateway_address, dns_server = params - self.write(_REG_SIPR, 0x04, ip_address) - self.write(_REG_SUBR, 0x04, subnet_mask) - self.write(_REG_GAR, 0x04, gateway_address) - - self._dns = dns_server - - # pylint: disable=too-many-return-statements - def _w5xxx_init(self) -> int: - """ - Detect and initialize a Wiznet5k ethernet module. - - :return int: 1 if the initialization succeeds, 0 if it fails. - """ - - def _detect_and_reset_w5500() -> bool: - """ - Detect and reset a W5500 chip. Called at startup to initialize the - interface hardware. - - :return bool: True if a W5500 chip is detected, False if not. - """ - self._chip_type = "w5500" - self._write_mr(0x80) - time.sleep(0.05) - if self._read_mr()[0] & 0x80: - return False - - # assert self.sw_reset() == 0, "Chip not reset properly!" - self._write_mr(0x80) - time.sleep(0.05) - if self._read_mr()[0] & 0x80: - return False - - self._write_mr(0x08) - # assert self._read_mr()[0] == 0x08, "Expected 0x08." - if self._read_mr()[0] != 0x08: - return False - - self._write_mr(0x10) - # assert self._read_mr()[0] == 0x10, "Expected 0x10." - if self._read_mr()[0] != 0x10: - return False - - self._write_mr(0x00) - # assert self._read_mr()[0] == 0x00, "Expected 0x00." - if self._read_mr()[0] != 0x00: - return False - - if self.read(_REG_VERSIONR_W5500, 0x00)[0] != 0x04: - return False - # self._chip_type = "w5500" - # self._ch_base_msb = 0x10 - return True - - def _detect_and_reset_w5100s() -> bool: - """ - Detect and reset a W5100S chip. Called at startup to initialize the - interface hardware. - - :return bool: True if a W5100 chip is detected, False if not. - """ - self._chip_type = "w5100s" - # sw reset - assert self.sw_reset() == 0, "Chip not reset properly!" - if self.read(_REG_VERSIONR_W5100S, 0x00)[0] != 0x51: - return False - - self._ch_base_msb = 0x0400 - return True - - time.sleep(1) - self._cs.switch_to_output() - self._cs.value = 1 - - # Detect if chip is Wiznet W5500 - if _detect_and_reset_w5500(): - # perform w5500 initialization - for i in range(0, _W5200_W5500_MAX_SOCK_NUM): - ctrl_byte = 0x0C + (i << 5) - self.write(0x1E, ctrl_byte, 2) - self.write(0x1F, ctrl_byte, 2) - else: - # Detect if chip is Wiznet W5100S - if _detect_and_reset_w5100s(): - pass - else: - return 0 - return 1 - - def sw_reset(self) -> int: - """Perform a soft-reset on the Wiznet chip. - - Perform a soft reset by writing to the chip's MR register reset bit. - - :return int: 0 if the reset succeeds, -1 if not. - """ - mode_reg = self._read_mr() - self._write_mr(0x80) - mode_reg = self._read_mr() - - # W5100S case => 0x03 - if (mode_reg[0] != 0x00) and (mode_reg[0] != 0x03): - return -1 - return 0 - - def _read_mr(self) -> bytearray: - """Read from the Mode Register (MR).""" - res = self.read(_REG_MR, 0x00) - return res - - def _write_mr(self, data: int) -> None: - """Write to the mode register (MR).""" - self.write(_REG_MR, 0x04, data) - - def read( - self, - addr: int, - callback: int, - length: int = 1, - buffer: Optional[WriteableBuffer] = None, - ) -> Union[WriteableBuffer, bytearray]: - """ - Read data from a register address. - - :param int addr: Register address to read. - :param int callback: Callback reference. - :param int length: Number of bytes to read from the register, defaults to 1. - :param Optional[WriteableBuffer] buffer: Buffer to read data into, defaults to None. - - :return Union[WriteableBuffer, bytearray]: Data read from the chip. - """ - with self._device as bus_device: - if self._chip_type == "w5500": - bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member - bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member - bus_device.write(bytes([callback])) # pylint: disable=no-member - else: - # if self._chip_type == "w5100s": - bus_device.write(bytes([0x0F])) # pylint: disable=no-member - bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member - bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member - - if buffer is None: - self._rxbuf = bytearray(length) - bus_device.readinto(self._rxbuf) # pylint: disable=no-member - return self._rxbuf - bus_device.readinto(buffer, end=length) # pylint: disable=no-member - return buffer - - def write( - self, addr: int, callback: int, data: Union[int, Sequence[Union[int, bytes]]] - ) -> None: - """ - Write data to a register address. - - :param int addr: Destination address. - :param int callback: Callback reference. - :param Union[int, Sequence[Union[int, bytes]]] data: Data to write to the register address. - """ - with self._device as bus_device: - if self._chip_type == "w5500": - bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member - bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member - bus_device.write(bytes([callback])) # pylint: disable=no-member - else: - # if self._chip_type == "w5100s": - bus_device.write(bytes([0xF0])) # pylint: disable=no-member - bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member - bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member + self._write(_REG_SIPR[self._chip_type], 0x04, bytes(ip_address)) + self._write(_REG_SUBR[self._chip_type], 0x04, bytes(subnet_mask)) + self._write(_REG_GAR[self._chip_type], 0x04, bytes(gateway_address)) - if hasattr(data, "from_bytes"): - bus_device.write(bytes([data])) # pylint: disable=no-member - else: - for data_comp in data: - bus_device.write(bytes([data_comp])) # pylint: disable=no-member + self._dns = bytes(dns_server) - # Socket-Register API + # *** Public Socket Methods *** - def socket_available(self, socket_num: int, sock_type: int = _SNMR_TCP) -> int: + def socket_available(self, socket_num: int, sock_type: int = SNMR_TCP) -> int: """ Number of bytes available to be read from the socket. :param int socket_num: Socket to check for available bytes. - :param int sock_type: Socket type. Use SNMR_TCP for TCP or SNMR_UDP for UDP, \ + :param int sock_type: Socket type. Use SNMR_TCP for TCP or SNMR_UDP for UDP, defaults to SNMR_TCP. :return int: Number of bytes available to read. + + :raises ValueError: If the socket number is out of range. + :raises ValueError: If the number of bytes on a UDP socket is negative. """ debug_msg( "socket_available called on socket {}, protocol {}".format( @@ -649,28 +511,16 @@ def socket_available(self, socket_num: int, sock_type: int = _SNMR_TCP) -> int: ), self._debug, ) - if socket_num > self.max_sockets: - raise ValueError("Provided socket exceeds max_sockets.") - - res = self._get_rx_rcv_size(socket_num) - - if sock_type == _SNMR_TCP: - return res - if res > 0: - if self.udp_datasize[socket_num]: - return self.udp_datasize[socket_num] - # parse the udp rx packet - # read the first 8 header bytes - ret, self._pbuff = self.socket_read(socket_num, 8) - if ret > 0: - self.udp_from_ip[socket_num] = self._pbuff[:4] - self.udp_from_port[socket_num] = (self._pbuff[4] << 8) + self._pbuff[5] - self.udp_datasize[socket_num] = (self._pbuff[6] << 8) + self._pbuff[7] - ret = self.udp_datasize[socket_num] - return ret - return 0 - - def socket_status(self, socket_num: int) -> Optional[bytearray]: + self._sock_num_in_range(socket_num) + + number_of_bytes = self._get_rx_rcv_size(socket_num) + if self.read_snsr(socket_num) == SNMR_UDP: + number_of_bytes -= 8 # Subtract UDP header from packet size. + if number_of_bytes < 0: + raise ValueError("Negative number of bytes found on socket.") + return number_of_bytes + + def socket_status(self, socket_num: int) -> int: """ Socket connection status. @@ -681,30 +531,33 @@ def socket_status(self, socket_num: int) -> Optional[bytearray]: :param int socket_num: ID of socket to check. - :return: Optional[bytearray] + :return int: The connection status. """ return self.read_snsr(socket_num) def socket_connect( self, socket_num: int, - dest: Union[bytes, bytearray], + dest: IpAddress4Raw, port: int, - conn_mode: int = _SNMR_TCP, + conn_mode: int = SNMR_TCP, ) -> int: """ - Open and verify a connection from a socket to a destination IP address + Open and verify a connection from a socket to a destination IPv4 address or hostname. A TCP connection is made by default. A UDP connection can also be made. :param int socket_num: ID of the socket to be connected. - :param Union[bytes, bytearray] dest: The destination as a host name or IP address. - :param int port: Port to connect to (0 - 65,536). + :param IpAddress4Raw dest: The destination as a host name or IP address. + :param int port: Port to connect to (0 - 65,535). :param int conn_mode: The connection mode. Use SNMR_TCP for TCP or SNMR_UDP for UDP, defaults to SNMR_TCP. + + :raises ValueError: if the socket number is out of range. + :raises ConnectionError: If the connection to the socket cannot be established. """ - if not self.link_status: - raise ConnectionError("Ethernet cable disconnected!") + self._sock_num_in_range(socket_num) + self._check_link_status() debug_msg( "W5K socket connect, protocol={}, port={}, ip={}".format( conn_mode, port, self.pretty_ip(dest) @@ -712,42 +565,31 @@ def socket_connect( self._debug, ) # initialize a socket and set the mode - res = self.socket_open(socket_num, conn_mode=conn_mode) - if res == 1: - raise ConnectionError("Failed to initialize a connection with the socket.") - + self.socket_open(socket_num, conn_mode=conn_mode) # set socket destination IP and port self.write_sndipr(socket_num, dest) self.write_sndport(socket_num, port) - self._send_socket_cmd(socket_num, _CMD_SOCK_CONNECT) + self.write_sncr(socket_num, _CMD_SOCK_CONNECT) - if conn_mode == _SNMR_TCP: + if conn_mode == SNMR_TCP: # wait for tcp connection establishment - while self.socket_status(socket_num)[0] != SNSR_SOCK_ESTABLISHED: + while self.socket_status(socket_num) != SNSR_SOCK_ESTABLISHED: time.sleep(0.001) debug_msg( - "SNSR: {}".format(self.socket_status(socket_num)[0]), self._debug + "SNSR: {}".format(self.socket_status(socket_num)), self._debug ) - if self.socket_status(socket_num)[0] == SNSR_SOCK_CLOSED: + if self.socket_status(socket_num) == SNSR_SOCK_CLOSED: raise ConnectionError("Failed to establish connection.") - elif conn_mode == SNMR_UDP: - self.udp_datasize[socket_num] = 0 return 1 - def _send_socket_cmd(self, socket: int, cmd: int) -> None: - """Send a socket command to a socket.""" - self.write_sncr(socket, cmd) - while self.read_sncr(socket) != b"\x00": - debug_msg("waiting for SNCR to clear...", self._debug) - def get_socket(self, *, reserve_socket=False) -> int: """ - Request, allocate and return a socket from the W5k chip. + Request, allocate and return a socket from the WIZnet 5k chip. Cycle through the sockets to find the first available one. If the called with reserve_socket=True, update the list of reserved sockets (intended to be used with socket.socket()). Note that reserved sockets must be released by calling - cancel_reservation() once they are no longer needed. + release_socket() once they are no longer needed. If all sockets are reserved, no sockets are available for DNS calls, etc. Therefore, one socket cannot be reserved. Since socket 0 is the only socket that is capable of @@ -761,7 +603,7 @@ def get_socket(self, *, reserve_socket=False) -> int: """ debug_msg("*** Get socket.", self._debug) # Prefer socket zero for none reserved calls as it cannot be reserved. - if not reserve_socket and self.socket_status(0)[0] == SNSR_SOCK_CLOSED: + if not reserve_socket and self.socket_status(0) == SNSR_SOCK_CLOSED: debug_msg("Allocated socket # 0", self._debug) return 0 # Then check the other sockets. @@ -774,10 +616,7 @@ def get_socket(self, *, reserve_socket=False) -> int: ) for socket_number, reserved in enumerate(WIZNET5K._sockets_reserved, start=1): - if ( - not reserved - and self.socket_status(socket_number)[0] == SNSR_SOCK_CLOSED - ): + if not reserved and self.socket_status(socket_number) == SNSR_SOCK_CLOSED: if reserve_socket: WIZNET5K._sockets_reserved[socket_number - 1] = True debug_msg( @@ -785,19 +624,21 @@ def get_socket(self, *, reserve_socket=False) -> int: self._debug, ) return socket_number - raise RuntimeError("Out of sockets.") + raise RuntimeError("All sockets in use.") - @staticmethod - def release_socket(socket_number): + def release_socket(self, socket_number): """ Update the socket reservation list when a socket is no longer reserved. :param int socket_number: The socket to release. + + :raises ValueError: If the socket number is out of range. """ + self._sock_num_in_range(socket_number) WIZNET5K._sockets_reserved[socket_number - 1] = False def socket_listen( - self, socket_num: int, port: int, conn_mode: int = _SNMR_TCP + self, socket_num: int, port: int, conn_mode: int = SNMR_TCP ) -> None: """ Listen on a socket's port. @@ -806,9 +647,13 @@ def socket_listen( :param int port: Port to listen on (0 - 65,535). :param int conn_mode: Connection mode SNMR_TCP for TCP or SNMR_UDP for UDP, defaults to SNMR_TCP. + + :raises ValueError: If the socket number is out of range. + :raises ConnectionError: If the Ethernet link is down. + :raises RuntimeError: If unable to connect to a hardware socket. """ - if not self.link_status: - raise ConnectionError("Ethernet cable disconnected!") + self._sock_num_in_range(socket_num) + self._check_link_status() debug_msg( "* Listening on port={}, ip={}".format( port, self.pretty_ip(self.ip_address) @@ -817,39 +662,36 @@ def socket_listen( ) # Initialize a socket and set the mode self.src_port = port - res = self.socket_open(socket_num, conn_mode=conn_mode) + self.socket_open(socket_num, conn_mode=conn_mode) self.src_port = 0 - if res == 1: - raise RuntimeError("Failed to initialize the socket.") # Send listen command - self._send_socket_cmd(socket_num, _CMD_SOCK_LISTEN) + self.write_sncr(socket_num, _CMD_SOCK_LISTEN) # Wait until ready - status = [SNSR_SOCK_CLOSED] - while status[0] not in ( + status = SNSR_SOCK_CLOSED + while status not in ( SNSR_SOCK_LISTEN, SNSR_SOCK_ESTABLISHED, _SNSR_SOCK_UDP, ): status = self.read_snsr(socket_num) - if status[0] == SNSR_SOCK_CLOSED: + if status == SNSR_SOCK_CLOSED: raise RuntimeError("Listening socket closed.") - def socket_accept( - self, socket_num: int - ) -> Tuple[int, Tuple[Union[str, bytearray], Union[int, bytearray]]]: + def socket_accept(self, socket_num: int) -> Tuple[int, Tuple[str, int]]: """ - Destination IP address and port from an incoming connection. + Destination IPv4 address and port from an incoming connection. Return the next socket number so listening can continue, along with the IP address and port of the incoming connection. :param int socket_num: Socket number with connection to check. + :return Tuple[int, Tuple[Union[str, bytearray], Union[int, bytearray]]]: If successful, the next (socket number, (destination IP address, destination port)). - If errors occur, the destination IP address and / or the destination port may be - returned as bytearrays. + :raises ValueError: If the socket number is out of range. """ + self._sock_num_in_range(socket_num) dest_ip = self.remote_ip(socket_num) dest_port = self.remote_port(socket_num) next_socknum = self.get_socket() @@ -861,7 +703,7 @@ def socket_accept( ) return next_socknum, (dest_ip, dest_port) - def socket_open(self, socket_num: int, conn_mode: int = _SNMR_TCP) -> int: + def socket_open(self, socket_num: int, conn_mode: int = SNMR_TCP) -> None: """ Open an IP socket. @@ -870,13 +712,15 @@ def socket_open(self, socket_num: int, conn_mode: int = _SNMR_TCP) -> int: :param int socket_num: The socket number to open. :param int conn_mode: The protocol to use. Use SNMR_TCP for TCP or SNMR_UDP for \ UDP, defaults to SNMR_TCP. - :return int: 1 if the socket was opened, 0 if not. + + :raises ValueError: If the socket number is out of range. + :raises ConnectionError: If the Ethernet link is down or no connection to socket. + :raises RuntimeError: If unable to open a socket in UDP or TCP mode. """ - if not self.link_status: - raise ConnectionError("Ethernet cable disconnected!") + self._sock_num_in_range(socket_num) + self._check_link_status() debug_msg("*** Opening socket {}".format(socket_num), self._debug) - status = self.read_snsr(socket_num)[0] - if status in ( + if self.read_snsr(socket_num) not in ( SNSR_SOCK_CLOSED, SNSR_SOCK_TIME_WAIT, SNSR_SOCK_FIN_WAIT, @@ -884,57 +728,46 @@ def socket_open(self, socket_num: int, conn_mode: int = _SNMR_TCP) -> int: _SNSR_SOCK_CLOSING, _SNSR_SOCK_UDP, ): - debug_msg( - "* Opening W5k Socket, protocol={}".format(conn_mode), self._debug - ) - time.sleep(0.00025) + raise ConnectionError("Failed to initialize a connection with the socket.") + debug_msg("* Opening W5k Socket, protocol={}".format(conn_mode), self._debug) + time.sleep(0.00025) - self.write_snmr(socket_num, conn_mode) - self.write_snir(socket_num, 0xFF) + self.write_snmr(socket_num, conn_mode) + self.write_snir(socket_num, 0xFF) - if self.src_port > 0: - # write to socket source port - self.write_sock_port(socket_num, self.src_port) - else: + if self.src_port > 0: + # write to socket source port + self.write_sock_port(socket_num, self.src_port) + else: + s_port = randint(49152, 65535) + while s_port in self._src_ports_in_use: s_port = randint(49152, 65535) - while s_port in _SRC_PORTS: - s_port = randint(49152, 65535) - self.write_sock_port(socket_num, s_port) - _SRC_PORTS[socket_num] = s_port - - # open socket - self.write_sncr(socket_num, _CMD_SOCK_OPEN) - self.read_sncr(socket_num) - if self.read_snsr((socket_num))[0] not in [0x13, 0x22]: - raise RuntimeError("Could not open socket in TCP or UDP mode.") - return 0 - return 1 + self.write_sock_port(socket_num, s_port) + self._src_ports_in_use[socket_num] = s_port + + # open socket + self.write_sncr(socket_num, _CMD_SOCK_OPEN) + if self.read_snsr(socket_num) not in [_SNSR_SOCK_INIT, _SNSR_SOCK_UDP]: + raise RuntimeError("Could not open socket in TCP or UDP mode.") def socket_close(self, socket_num: int) -> None: """ Close a socket. :param int socket_num: The socket to close. + + :raises ValueError: If the socket number is out of range. """ debug_msg("*** Closing socket {}".format(socket_num), self._debug) - timeout = time.monotonic() + 5.0 + self._sock_num_in_range(socket_num) self.write_sncr(socket_num, _CMD_SOCK_CLOSE) - debug_msg(" Waiting for close command to process…", self._debug) - while self.read_sncr(socket_num)[0]: - if time.monotonic() < timeout: - raise RuntimeError( - "Wiznet5k failed to complete command, status = {}.".format( - self.read_sncr(socket_num)[0] - ) - ) - time.sleep(0.0001) debug_msg(" Waiting for socket to close…", self._debug) timeout = time.monotonic() + 5.0 - while self.read_snsr(socket_num)[0] != SNSR_SOCK_CLOSED: + while self.read_snsr(socket_num) != SNSR_SOCK_CLOSED: if time.monotonic() > timeout: raise RuntimeError( "Wiznet5k failed to close socket, status = {}.".format( - self.read_snsr(socket_num)[0] + self.read_snsr(socket_num) ) ) time.sleep(0.0001) @@ -945,77 +778,58 @@ def socket_disconnect(self, socket_num: int) -> None: Disconnect a TCP or UDP connection. :param int socket_num: The socket to close. + + :raises ValueError: If the socket number is out of range. """ debug_msg("*** Disconnecting socket {}".format(socket_num), self._debug) + self._sock_num_in_range(socket_num) self.write_sncr(socket_num, _CMD_SOCK_DISCON) - self.read_sncr(socket_num) def socket_read(self, socket_num: int, length: int) -> Tuple[int, bytes]: """ - Read data from a TCP socket. + Read data from a hardware socket. Called directly by TCP socket objects and via + read_udp() for UDP socket objects. :param int socket_num: The socket to read data from. :param int length: The number of bytes to read from the socket. - :return Tuple[int, Union[int, bytearray]]: If the read was successful then the first - item of the tuple is the length of the data and the second is the data. If the read - was unsuccessful then both items equal an error code, 0 for no data waiting and -1 - for no connection to the socket. + :returns Tuple[int, bytes]: If the read was successful then the first + item of the tuple is the length of the data and the second is the data. + If the read was unsuccessful then 0, b"" is returned. + + :raises ValueError: If the socket number is out of range. + :raises ConnectionError: If the Ethernet link is down. + :raises RuntimeError: If the socket connection has been lost. """ - # pylint: disable=too-many-branches - if not self.link_status: - raise ConnectionError("Ethernet cable disconnected!") - if socket_num > self.max_sockets: - raise ValueError("Provided socket exceeds max_sockets.") + self._sock_num_in_range(socket_num) + self._check_link_status() # Check if there is data available on the socket - ret = self._get_rx_rcv_size(socket_num) - debug_msg("Bytes avail. on sock: {}".format(ret), self._debug) - if ret == 0: - # no data on socket? - status = self._read_snmr(socket_num) - if status in (SNSR_SOCK_LISTEN, SNSR_SOCK_CLOSED, SNSR_SOCK_CLOSE_WAIT): - # remote end closed its side of the connection, EOF state - raise RuntimeError("Lost connection to peer.") - # connection is alive, no data waiting to be read - ret = -1 - elif ret > length: - # set ret to the length of buffer - ret = length - if ret > 0: - debug_msg("* Processing {} bytes of data".format(ret), self._debug) - # Read the starting save address of the received data - ptr = self._read_snrx_rd(socket_num) - - if self._chip_type == "w5500": - # Read data from the starting address of snrx_rd - ctrl_byte = 0x18 + (socket_num << 5) - - resp = self.read(ptr, ctrl_byte, ret) - else: - # if self._chip_type == "w5100s": - offset = ptr & _SOCK_MASK - src_addr = offset + (socket_num * _SOCK_SIZE + 0x6000) - if offset + ret > _SOCK_SIZE: - size = _SOCK_SIZE - offset - resp1 = self.read(src_addr, 0x00, size) - size = ret - size - src_addr = socket_num * _SOCK_SIZE + 0x6000 - resp2 = self.read(src_addr, 0x00, size) - resp = resp1 + resp2 - else: - resp = self.read(src_addr, 0x00, ret) - - # After reading the received data, update Sn_RX_RD to the increased - # value as many as the reading size. - ptr = (ptr + ret) & 0xFFFF - self._write_snrx_rd(socket_num, ptr) - - # Notify the W5k of the updated Sn_Rx_RD + bytes_on_socket = self._get_rx_rcv_size(socket_num) + debug_msg("Bytes avail. on sock: {}".format(bytes_on_socket), self._debug) + if bytes_on_socket: + bytes_on_socket = length if bytes_on_socket > length else bytes_on_socket + debug_msg( + "* Processing {} bytes of data".format(bytes_on_socket), self._debug + ) + # Read the starting save address of the received data. + pointer = self._read_snrx_rd(socket_num) + # Read data from the hardware socket. + bytes_read = self._chip_socket_read(socket_num, pointer, bytes_on_socket) + # After reading the received data, update Sn_RX_RD register. + pointer = (pointer + bytes_on_socket) & 0xFFFF + self._write_snrx_rd(socket_num, pointer) self.write_sncr(socket_num, _CMD_SOCK_RECV) - while self.read_sncr(socket_num)[0] & _CMD_SOCK_RECV: - time.sleep(0.0001) - return ret, resp + else: + # no data on socket + if self._read_snmr(socket_num) in ( + SNSR_SOCK_LISTEN, + SNSR_SOCK_CLOSED, + SNSR_SOCK_CLOSE_WAIT, + ): + raise RuntimeError("Lost connection to peer.") + bytes_read = b"" + return bytes_on_socket, bytes_read def read_udp(self, socket_num: int, length: int) -> Tuple[int, bytes]: """ @@ -1027,20 +841,31 @@ def read_udp(self, socket_num: int, length: int) -> Tuple[int, bytes]: :return Tuple[int, bytes]: If the read was successful then the first item of the tuple is the length of the data and the second is the data. If the read was unsuccessful then (0, b"") is returned. - """ - if self.udp_datasize[socket_num] > 0: - if self.udp_datasize[socket_num] <= length: - ret, resp = self.socket_read(socket_num, self.udp_datasize[socket_num]) - else: - ret, resp = self.socket_read(socket_num, length) - # just consume the rest, it is lost to the higher layers - self.socket_read(socket_num, self.udp_datasize[socket_num] - length) - self.udp_datasize[socket_num] = 0 - return ret, resp - return 0, b"" + + :raises ValueError: If the socket number is out of range. + """ + self._sock_num_in_range(socket_num) + bytes_on_socket, bytes_read = 0, b"" + # Parse the UDP packet header. + header_length, self._pbuff[:8] = self.socket_read(socket_num, 8) + if header_length: + if header_length != 8: + raise ValueError("Invalid UDP header.") + data_length = self._chip_parse_udp_header(socket_num) + # Read the UDP packet data. + if data_length: + if data_length <= length: + bytes_on_socket, bytes_read = self.socket_read( + socket_num, data_length + ) + else: + bytes_on_socket, bytes_read = self.socket_read(socket_num, length) + # just consume the rest, it is lost to the higher layers + self.socket_read(socket_num, data_length - length) + return bytes_on_socket, bytes_read def socket_write( - self, socket_num: int, buffer: bytearray, timeout: float = 0 + self, socket_num: int, buffer: bytearray, timeout: float = 0.0 ) -> int: """ Write data to a socket. @@ -1050,83 +875,253 @@ def socket_write( :param float timeout: Write data timeout in seconds, defaults to 0.0 which waits indefinitely. - :return int: The number of bytes written to the buffer. + :return int: The number of bytes written to the socket. + + :raises ConnectionError: If the Ethernet link is down. + :raises ValueError: If the socket number is out of range. + :raises RuntimeError: If the data cannot be sent. """ - # pylint: disable=too-many-branches - if not self.link_status: - raise ConnectionError("Ethernet cable disconnected!") - if socket_num > self.max_sockets: - raise ValueError("Provided socket exceeds max_sockets.") + self._sock_num_in_range(socket_num) + self._check_link_status() if len(buffer) > _SOCK_SIZE: - ret = _SOCK_SIZE + bytes_to_write = _SOCK_SIZE else: - ret = len(buffer) - stamp = time.monotonic() + bytes_to_write = len(buffer) + stop_time = time.monotonic() + timeout # If buffer is available, start the transfer free_size = self._get_tx_free_size(socket_num) - while free_size < ret: + while free_size < bytes_to_write: free_size = self._get_tx_free_size(socket_num) - status = self.socket_status(socket_num)[0] + status = self.socket_status(socket_num) if status not in (SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT) or ( - timeout and time.monotonic() - stamp > timeout + timeout and time.monotonic() > stop_time ): - ret = 0 - break + raise RuntimeError("Unable to write data to the socket.") # Read the starting address for saving the transmitting data. - ptr = self._read_sntx_wr(socket_num) - offset = ptr & _SOCK_MASK - if self._chip_type == "w5500": - dst_addr = offset + (socket_num * _SOCK_SIZE + 0x8000) - txbuf = buffer[:ret] - cntl_byte = 0x14 + (socket_num << 5) - self.write(dst_addr, cntl_byte, txbuf) - - else: - # if self._chip_type == "w5100s": - dst_addr = offset + (socket_num * _SOCK_SIZE + 0x4000) - - if offset + ret > _SOCK_SIZE: - size = _SOCK_SIZE - offset - txbuf = buffer[0:size] - self.write(dst_addr, 0x00, txbuf) - txbuf = buffer[size:ret] - size = ret - size - dst_addr = socket_num * _SOCK_SIZE + 0x4000 - self.write(dst_addr, 0x00, txbuf) - else: - txbuf = buffer[:ret] - self.write(dst_addr, 0x00, buffer[:ret]) - + pointer = self._read_sntx_wr(socket_num) + offset = pointer & _SOCK_MASK + self._chip_socket_write(socket_num, offset, bytes_to_write, buffer) # update sn_tx_wr to the value + data size - ptr = (ptr + ret) & 0xFFFF - self._write_sntx_wr(socket_num, ptr) + pointer = (pointer + bytes_to_write) & 0xFFFF + self._write_sntx_wr(socket_num, pointer) self.write_sncr(socket_num, _CMD_SOCK_SEND) - while self.read_sncr(socket_num) != b"\x00": - time.sleep(0.001) # check data was transferred correctly - while not self.read_snir(socket_num)[0] & _SNIR_SEND_OK: - if self.socket_status(socket_num)[0] in ( + while not self.read_snir(socket_num) & _SNIR_SEND_OK: + if self.socket_status(socket_num) in ( SNSR_SOCK_CLOSED, SNSR_SOCK_TIME_WAIT, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSE_WAIT, _SNSR_SOCK_CLOSING, ): - raise RuntimeError("Socket closed before data was sent.") - if timeout and time.monotonic() - stamp > timeout: + raise RuntimeError("No data was sent, socket was closed.") + if timeout and time.monotonic() > stop_time: raise RuntimeError("Operation timed out. No data sent.") - if self.read_snir(socket_num)[0] & SNIR_TIMEOUT: - raise TimeoutError( - "Hardware timeout while sending on socket {}.".format(socket_num) - ) + if self.read_snir(socket_num) & SNIR_TIMEOUT: + self.write_snir(socket_num, SNIR_TIMEOUT) + # TCP sockets are closed by the hardware timeout + # so that will be caught at the while statement. + # UDP sockets are 1:many so not closed thus return 0. + if self._read_snmr(socket_num) == SNMR_UDP: + return 0 time.sleep(0.001) self.write_snir(socket_num, _SNIR_SEND_OK) - return ret + return bytes_to_write + + def sw_reset(self) -> None: + """ + Soft reset and reinitialize the WIZnet chip. + + :raises RuntimeError: If reset fails. + """ + self._wiznet_chip_init() + + def _sw_reset_5x00(self) -> bool: + """ + Perform a soft reset on the WIZnet 5100s and 5500 chips. + + :returns bool: True if reset was success + """ + self._write_mr(_MR_RST) + time.sleep(0.05) + return self._read_mr() == {"w5500": 0x00, "w5100s": 0x03}[self._chip_type] + + def _wiznet_chip_init(self) -> None: + """ + Detect and initialize a WIZnet 5k Ethernet module. + + :raises RuntimeError: If no WIZnet chip is detected. + """ + + def _setup_sockets() -> None: + """Initialise sockets for w5500 and w6100 chips.""" + for sock_num in range(_MAX_SOCK_NUM[self._chip_type]): + ctrl_byte = 0x0C + (sock_num << 5) + self._write(0x1E, ctrl_byte, 2) + self._write(0x1F, ctrl_byte, 2) + self._ch_base_msb = 0x00 + WIZNET5K._sockets_reserved = [False] * (_MAX_SOCK_NUM[self._chip_type] - 1) + self._src_ports_in_use = [0] * _MAX_SOCK_NUM[self._chip_type] + + def _detect_and_reset_w6100() -> bool: + """ + Detect and reset a W6100 chip. Called at startup to initialize the + interface hardware. + + :return bool: True if a W6100 chip is detected, False if not. + """ + self._chip_type = "w6100" + + # Reset w6100 + self._write(0x41F4, 0x04, 0xCE) # Unlock chip settings. + time.sleep(0.05) # Wait for unlock. + self._write(0x2004, 0x04, 0x00) # Reset chip. + time.sleep(0.05) # Wait for reset. + + if self._read(_REG_VERSIONR[self._chip_type], 0x00)[0] != 0x61: + return False + # Initialize w6100. + self._write(0x41F5, 0x04, 0x3A) # Unlock network settings. + _setup_sockets() + return True + + def _detect_and_reset_w5500() -> bool: + """ + Detect and reset a W5500 chip. Called at startup to initialize the + interface hardware. + + :return bool: True if a W5500 chip is detected, False if not. + """ + self._chip_type = "w5500" + if not self._sw_reset_5x00(): + return False + + self._write_mr(0x08) + if self._read_mr() != 0x08: + return False + + self._write_mr(0x10) + if self._read_mr() != 0x10: + return False + + self._write_mr(0x00) + if self._read_mr() != 0x00: + return False + + if self._read(_REG_VERSIONR[self._chip_type], 0x00)[0] != 0x04: + return False + # Initialize w5500 + _setup_sockets() + return True + + def _detect_and_reset_w5100s() -> bool: + """ + Detect and reset a W5100S chip. Called at startup to initialize the + interface hardware. + + :return bool: True if a W5100 chip is detected, False if not. + """ + self._chip_type = "w5100s" + if not self._sw_reset_5x00(): + return False + + if self._read(_REG_VERSIONR[self._chip_type], 0x00)[0] != 0x51: + return False + + # Initialise w5100s + self._ch_base_msb = 0x0400 + WIZNET5K._sockets_reserved = [False] * (_MAX_SOCK_NUM[self._chip_type] - 1) + self._src_ports_in_use = [0] * _MAX_SOCK_NUM[self._chip_type] + return True + + for func in [ + _detect_and_reset_w5100s, + _detect_and_reset_w5500, + _detect_and_reset_w6100, + ]: + if func(): + return + self._chip_type = None + raise RuntimeError("Failed to initialize WIZnet module.") + + def _sock_num_in_range(self, sock: int) -> None: + """Check that the socket number is in the range 0 - maximum sockets.""" + if not 0 <= sock < self.max_sockets: + raise ValueError("Socket number out of range.") + + def _check_link_status(self): + """Raise an exception if the link is down.""" + if not self.link_status: + raise ConnectionError("The Ethernet connection is down.") + + def _read_mr(self) -> int: + """Read from the Mode Register (MR).""" + return int.from_bytes(self._read(_REG_MR[self._chip_type], 0x00), "big") + + def _write_mr(self, data: int) -> None: + """Write to the mode register (MR).""" + self._write(_REG_MR[self._chip_type], 0x04, data) + + # *** Low Level Methods *** + + def _read( + self, + addr: int, + callback: int, + length: int = 1, + ) -> bytes: + """ + Read data from a register address. + + :param int addr: Register address to read. + :param int callback: Callback reference. + :param int length: Number of bytes to read from the register, defaults to 1. + + :return bytes: Data read from the chip. + """ + with self._device as bus_device: + self._chip_read(bus_device, addr, callback) + self._rxbuf = bytearray(length) + bus_device.readinto(self._rxbuf) + return bytes(self._rxbuf) + + def _write(self, addr: int, callback: int, data: Union[int, bytes]) -> None: + """ + Write data to a register address. + + :param int addr: Destination address. + :param int callback: Callback reference. + :param Union[int, bytes] data: Data to write to the register address, if data + is an integer, it must be 1 or 2 bytes. + + :raises OverflowError: if integer data is more than 2 bytes. + """ + with self._device as bus_device: + self._chip_write(bus_device, addr, callback) + try: + data = data.to_bytes(1, "big") + except OverflowError: + data = data.to_bytes(2, "big") + except AttributeError: + pass + bus_device.write(data) + + def _read_two_byte_sock_reg(self, sock: int, reg_address: int) -> int: + """Read a two byte socket register.""" + register = self._read_socket_register(sock, reg_address) << 8 + register += self._read_socket_register(sock, reg_address + 1) + return register + + def _write_two_byte_sock_reg(self, sock: int, reg_address: int, data: int) -> None: + """Write to a two byte socket register.""" + self._write_socket_register(sock, reg_address, data >> 8 & 0xFF) + self._write_socket_register(sock, reg_address + 1, data & 0xFF) + + # *** Socket Register Methods *** - # Socket-Register Methods def _get_rx_rcv_size(self, sock: int) -> int: """Size of received and saved in socket buffer.""" val = 0 @@ -1135,7 +1130,7 @@ def _get_rx_rcv_size(self, sock: int) -> int: val_1 = self._read_snrx_rsr(sock) if val_1 != 0: val = self._read_snrx_rsr(sock) - return int.from_bytes(val, "big") + return val def _get_tx_free_size(self, sock: int) -> int: """Free size of socket's tx buffer block.""" @@ -1145,196 +1140,205 @@ def _get_tx_free_size(self, sock: int) -> int: val_1 = self._read_sntx_fsr(sock) if val_1 != 0: val = self._read_sntx_fsr(sock) - return int.from_bytes(val, "big") + return val def _read_snrx_rd(self, sock: int) -> int: """Read socket n RX Read Data Pointer Register.""" - self._pbuff[0] = self._read_socket(sock, _REG_SNRX_RD)[0] - self._pbuff[1] = self._read_socket(sock, _REG_SNRX_RD + 1)[0] - return self._pbuff[0] << 8 | self._pbuff[1] + return self._read_two_byte_sock_reg(sock, _REG_SNRX_RD[self._chip_type]) def _write_snrx_rd(self, sock: int, data: int) -> None: """Write socket n RX Read Data Pointer Register.""" - self._write_socket(sock, _REG_SNRX_RD, data >> 8 & 0xFF) - self._write_socket(sock, _REG_SNRX_RD + 1, data & 0xFF) - - def _write_sntx_wr(self, sock: int, data: int) -> None: - """Write the socket write buffer pointer for socket `sock`.""" - self._write_socket(sock, _REG_SNTX_WR, data >> 8 & 0xFF) - self._write_socket(sock, _REG_SNTX_WR + 1, data & 0xFF) + self._write_two_byte_sock_reg(sock, _REG_SNRX_RD[self._chip_type], data) def _read_sntx_wr(self, sock: int) -> int: """Read the socket write buffer pointer for socket `sock`.""" - self._pbuff[0] = self._read_socket(sock, 0x0024)[0] - self._pbuff[1] = self._read_socket(sock, 0x0024 + 1)[0] - return self._pbuff[0] << 8 | self._pbuff[1] + return self._read_two_byte_sock_reg(sock, _REG_SNTX_WR[self._chip_type]) - def _read_sntx_fsr(self, sock: int) -> Optional[bytearray]: + def _write_sntx_wr(self, sock: int, data: int) -> None: + """Write the socket write buffer pointer for socket `sock`.""" + self._write_two_byte_sock_reg(sock, _REG_SNTX_WR[self._chip_type], data) + + def _read_sntx_fsr(self, sock: int) -> int: """Read socket n TX Free Size Register""" - data = self._read_socket(sock, _REG_SNTX_FSR) - data += self._read_socket(sock, _REG_SNTX_FSR + 1) - return data + return self._read_two_byte_sock_reg(sock, _REG_SNTX_FSR[self._chip_type]) - def _read_snrx_rsr(self, sock: int) -> Optional[bytearray]: + def _read_snrx_rsr(self, sock: int) -> int: """Read socket n Received Size Register""" - data = self._read_socket(sock, _REG_SNRX_RSR) - data += self._read_socket(sock, _REG_SNRX_RSR + 1) - return data + return self._read_two_byte_sock_reg(sock, _REG_SNRX_RSR[self._chip_type]) - def write_sndipr(self, sock: int, ip_addr: bytearray) -> None: - """Write to socket destination IP Address.""" - for offset in range(4): - self._write_socket(sock, _REG_SNDIPR + offset, ip_addr[offset]) - - def _read_sndipr(self, sock) -> bytearray: + def _read_sndipr(self, sock) -> bytes: """Read socket destination IP address.""" - data = b"" + data = [] for offset in range(4): - data += self._read_socket(sock, _REG_SIPR + offset) - return bytearray(data) + data.append( + self._read_socket_register(sock, _REG_SNDIPR[self._chip_type] + offset) + ) + return bytes(data) + + def write_sndipr(self, sock: int, ip_addr: bytes) -> None: + """Write to socket destination IP Address.""" + for offset, value in enumerate(ip_addr): + self._write_socket_register( + sock, _REG_SNDIPR[self._chip_type] + offset, value + ) def write_sndport(self, sock: int, port: int) -> None: """Write to socket destination port.""" - self._write_socket(sock, _REG_SNDPORT, port >> 8) - self._write_socket(sock, _REG_SNDPORT + 1, port & 0xFF) + self._write_two_byte_sock_reg(sock, _REG_SNDPORT[self._chip_type], port) - def read_snsr(self, sock: int) -> Optional[bytearray]: + def read_snsr(self, sock: int) -> int: """Read Socket n Status Register.""" - return self._read_socket(sock, _REG_SNSR) + return self._read_socket_register(sock, _REG_SNSR[self._chip_type]) - def read_snir(self, sock: int) -> Optional[bytearray]: + def read_snir(self, sock: int) -> int: """Read Socket n Interrupt Register.""" - return self._read_socket(sock, _REG_SNIR) - - def write_snmr(self, sock: int, protocol: int) -> None: - """Write to Socket n Mode Register.""" - self._write_socket(sock, _REG_SNMR, protocol) + return self._read_socket_register(sock, _REG_SNIR[self._chip_type]) def write_snir(self, sock: int, data: int) -> None: """Write to Socket n Interrupt Register.""" - self._write_socket(sock, _REG_SNIR, data) + self._write_socket_register(sock, _REG_SNIR[self._chip_type], data) + + def _read_snmr(self, sock: int) -> int: + """Read the socket MR register.""" + return self._read_socket_register(sock, _REG_SNMR) + + def write_snmr(self, sock: int, protocol: int) -> None: + """Write to Socket n Mode Register.""" + self._write_socket_register(sock, _REG_SNMR, protocol) def write_sock_port(self, sock: int, port: int) -> None: """Write to the socket port number.""" - self._write_socket(sock, _REG_SNPORT, port >> 8) - self._write_socket(sock, _REG_SNPORT + 1, port & 0xFF) + self._write_two_byte_sock_reg(sock, _REG_SNPORT[self._chip_type], port) def write_sncr(self, sock: int, data: int) -> None: """Write to socket command register.""" - self._write_socket(sock, _REG_SNCR, data) - - def read_sncr(self, sock: int) -> Optional[bytearray]: - """Read socket command register.""" - return self._read_socket(sock, _REG_SNCR) - - def _read_snmr(self, sock: int) -> Optional[bytearray]: - return self._read_socket(sock, _REG_SNMR) - - def _write_socket(self, sock: int, address: int, data: int) -> None: - """Write to a W5k socket register.""" - if self._chip_type == "w5500": - cntl_byte = (sock << 5) + 0x0C - return self.write(address, cntl_byte, data) - if self._chip_type == "w5100s": - cntl_byte = 0 - return self.write( - self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte, data - ) - return None - - def _read_socket(self, sock: int, address: int) -> bytearray: - """Read a W5k socket register.""" - if self._chip_type == "w5500": - cntl_byte = (sock << 5) + 0x08 - return self.read(address, cntl_byte) - if self._chip_type == "w5100s": - cntl_byte = 0 - return self.read(self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte) - raise RuntimeError("Invalid Wiznet chip type.") + self._write_socket_register(sock, _REG_SNCR[self._chip_type], data) + # Wait for command to complete before continuing. + while self._read_socket_register(sock, _REG_SNCR[self._chip_type]): + pass @property def rcr(self) -> int: """Retry count register.""" - if self._chip_type == "w5500": - rcr_reg = _REG_RCR_5500 - else: - # Assume a W5100s - rcr_reg = _REG_RCR_5100s - return self.read(rcr_reg, 0x00) + return int.from_bytes(self._read(_REG_RCR[self._chip_type], 0x00), "big") @rcr.setter def rcr(self, retry_count: int) -> None: + """Retry count register.""" if 0 > retry_count > 255: raise ValueError("Retries must be from 0 to 255.") - if self._chip_type == "w5500": - rcr_reg = _REG_RCR_5500 - else: - # Assume a W5100s - rcr_reg = _REG_RCR_5100s - self.write(rcr_reg, 0x04, retry_count) + self._write(_REG_RCR[self._chip_type], 0x04, retry_count) @property def rtr(self) -> int: """Retry time register.""" - if self._chip_type == "w5500": - reg = _REG_RTR_5500 - else: - # Assume a W5100s - reg = _REG_RTR_5100s - return self.read(reg, 0x00, 2) + return int.from_bytes(self._read(_REG_RTR[self._chip_type], 0x00, 2), "big") @rtr.setter - def rtr(self, retry_count: int) -> None: - if 0 > retry_count > 2**16: - raise ValueError("Retry time must be from 0 to {}".format(2**16)) - if self._chip_type == "w5500": - reg = _REG_RTR_5500 - else: - # Assume a W5100s - reg = _REG_RTR_5100s - self.write(reg, 0x04, retry_count) + def rtr(self, retry_time: int) -> None: + """Retry time register.""" + if 0 > retry_time >= 2**16: + raise ValueError("Retry time must be from 0 to 65535") + self._write(_REG_RTR[self._chip_type], 0x04, retry_time) + + # *** Chip Specific Methods *** + + def _chip_read(self, device: "busio.SPI", address: int, call_back: int) -> None: + """Chip specific calls for _read method.""" + if self._chip_type in ("w5500", "w6100"): + device.write((address >> 8).to_bytes(1, "big")) + device.write((address & 0xFF).to_bytes(1, "big")) + device.write(call_back.to_bytes(1, "big")) + elif self._chip_type == "w5100s": + device.write((0x0F).to_bytes(1, "big")) + device.write((address >> 8).to_bytes(1, "big")) + device.write((address & 0xFF).to_bytes(1, "big")) + + def _chip_write(self, device: "busio.SPI", address: int, call_back: int) -> None: + """Chip specific calls for _write.""" + if self._chip_type in ("w5500", "w6100"): + device.write((address >> 8).to_bytes(1, "big")) + device.write((address & 0xFF).to_bytes(1, "big")) + device.write(call_back.to_bytes(1, "big")) + elif self._chip_type == "w5100s": + device.write((0xF0).to_bytes(1, "big")) + device.write((address >> 8).to_bytes(1, "big")) + device.write((address & 0xFF).to_bytes(1, "big")) + + def _chip_socket_read(self, socket_number, pointer, bytes_to_read): + """Chip specific calls for socket_read.""" + if self._chip_type in ("w5500", "w6100"): + # Read data from the starting address of snrx_rd + ctrl_byte = 0x18 + (socket_number << 5) + bytes_read = self._read(pointer, ctrl_byte, bytes_to_read) + elif self._chip_type == "w5100s": + offset = pointer & _SOCK_MASK + src_addr = offset + (socket_number * _SOCK_SIZE + 0x6000) + if offset + bytes_to_read > _SOCK_SIZE: + split_point = _SOCK_SIZE - offset + bytes_read = self._read(src_addr, 0x00, split_point) + split_point = bytes_to_read - split_point + src_addr = socket_number * _SOCK_SIZE + 0x6000 + bytes_read += self._read(src_addr, 0x00, split_point) + else: + bytes_read = self._read(src_addr, 0x00, bytes_to_read) + return bytes_read + + def _chip_socket_write( + self, socket_number: int, offset: int, bytes_to_write: int, buffer: bytes + ): + """Chip specific calls for socket_write.""" + if self._chip_type in ("w5500", "w6100"): + dst_addr = offset + (socket_number * _SOCK_SIZE + 0x8000) + cntl_byte = 0x14 + (socket_number << 5) + self._write(dst_addr, cntl_byte, buffer[:bytes_to_write]) + + elif self._chip_type == "w5100s": + dst_addr = offset + (socket_number * _SOCK_SIZE + 0x4000) + + if offset + bytes_to_write > _SOCK_SIZE: + split_point = _SOCK_SIZE - offset + self._write(dst_addr, 0x00, buffer[:split_point]) + dst_addr = socket_number * _SOCK_SIZE + 0x4000 + self._write(dst_addr, 0x00, buffer[split_point:bytes_to_write]) + else: + self._write(dst_addr, 0x00, buffer[:bytes_to_write]) - def _ip_address_in_use(self, socknum, local_ip) -> bool: + def _chip_parse_udp_header(self, socket_num) -> int: """ - Send an ARP to the IPv4 address supplied and wait for a response. + Parse chip specific UDP header data for IPv4 packets. - A helper function for the DHCP client to confirm that the offered IP address is - not in use before setting up the DHCP parameters. May also be called by the user - before setting a manual IP address, to make sure that it is not already in use. + Sets the source IPv4 address and port number and returns the UDP data length. - According to RFC5227 section 2.1.1 of , we check for ARP Probe or ARPResponse - reception from other devices for 1 second after sending ARPProbe. If there is no - reception for 1 second, the probe is repeated three times in total, and if there - is no reception, it is determined that there is no conflict. - - :param bytes local_ip: The 4 byte IPv4 address to test for a conflict. - :param int socknum: The socket to test. + :return int: The UDP data length. + """ + if self._chip_type in ("w5100s", "w5500"): + self.udp_from_ip[socket_num] = self._pbuff[:4] + self.udp_from_port[socket_num] = int.from_bytes(self._pbuff[4:6], "big") + return int.from_bytes(self._pbuff[6:], "big") + if self._chip_type == "w6100": + self.udp_from_ip[socket_num] = self._pbuff[3:7] + self.udp_from_port[socket_num] = int.from_bytes(self._pbuff[6:], "big") + return int.from_bytes(self._pbuff[:2], "big") & 0x07FF + raise ValueError("Unsupported chip type.") - :returns bool: True if the he address is already in use), False if not. + def _write_socket_register(self, sock: int, address: int, data: int) -> None: + """Write to a WIZnet 5k socket register.""" + if self._chip_type in ("w5500", "w6100"): + cntl_byte = (sock << 5) + 0x0C + self._write(address, cntl_byte, data) + elif self._chip_type == "w5100s": + cntl_byte = 0 + self._write(self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte, data) - :raises RuntimeError: If the Ethernet link is down or could not connect to the socket. - """ - # Check link status - if not self.link_status: - raise RuntimeError("Ethernet link is down") - # Store current RTR, RCR and destination IPv4 address. - temp_rcr = self.rcr - temp_rtr = self.rtr - temp_ip = self._read_sndipr(socknum) - # Set current retry timer and retry count to 1 sec and 3 tries to match DHCP standard. - self.rcr = 3 - self.rtr = 100000 # 100us * 10000 = 1 second - # Send a dummy packet to the assigned address on the DHCP socket to mimic ARP. - ip_in_use = True - try: - if self.socket_connect(socknum, bytes(local_ip), 5000, conn_mode=0x02) != 1: - raise RuntimeError("Unable to connect to socket {}.".format(socknum)) - self.socket_write(socknum, b"CHECK_IP_CONFLICT") - except TimeoutError: - ip_in_use = False - finally: - # Reset the RTR, RCR and destination IPv4 registers. - self.write_sndipr(socknum, temp_ip) - self.rcr = temp_rcr - self.rtr = temp_rtr - return ip_in_use + def _read_socket_register(self, sock: int, address: int) -> int: + """Read a WIZnet 5k socket register.""" + if self._chip_type in ("w5500", "w6100"): + cntl_byte = (sock << 5) + 0x08 + register = self._read(address, cntl_byte) + elif self._chip_type == "w5100s": + cntl_byte = 0 + register = self._read( + self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte + ) + return int.from_bytes(register, "big") diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py index 7b349fb..8d42ca7 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py @@ -245,11 +245,7 @@ def _dhcp_connection_setup(self, timeout: float = 5.0) -> None: self._eth.write_snmr(self._wiz_sock, 0x02) # Set UDP connection self._eth.write_sock_port(self._wiz_sock, 68) # Set DHCP client port. self._eth.write_sncr(self._wiz_sock, 0x01) # Open the socket. - while ( - self._eth.read_sncr(self._wiz_sock) != b"\x00" - ): # Wait for command to complete. - time.sleep(0.001) - if self._eth.read_snsr(self._wiz_sock) == b"\x22": + if self._eth.read_snsr(self._wiz_sock) == 0x22: self._eth.write_sndport(2, _DHCP_SERVER_PORT) debug_msg("+ Connection OK, port set.", self._debug) return @@ -299,30 +295,18 @@ def _receive_dhcp_response(self, timeout: float) -> int: :returns int: The number of bytes stored in the global buffer. """ debug_msg("Receiving a DHCP response.", self._debug) - # DHCP returns the query plus additional data. The query length is 236 bytes. - minimum_packet_length = 236 - buffer = bytearray(b"") - bytes_read = 0 - debug_msg("+ Beginning to receive…", self._debug) - while bytes_read < minimum_packet_length and time.monotonic() < timeout: - if self._eth.socket_available(self._wiz_sock, _SNMR_UDP): - x = self._eth.read_udp(self._wiz_sock, _BUFF_LENGTH - bytes_read)[1] - buffer.extend(x) - bytes_read = len(buffer) - debug_msg("+ Bytes read so far {}".format(bytes_read), self._debug) - debug_msg(x, self._debug) - if bytes_read == _BUFF_LENGTH: - break - debug_msg("Received {} bytes".format(bytes_read), self._debug) - if bytes_read < minimum_packet_length: - bytes_read = 0 - else: - _BUFF[:bytes_read] = buffer - _BUFF[bytes_read:] = bytearray(_BUFF_LENGTH - bytes_read) - del buffer - gc.collect() - debug_msg(_BUFF[:bytes_read], self._debug) - return bytes_read + while time.monotonic() < timeout: + # DHCP returns the query plus additional data. The query length is 236 bytes. + if self._eth.socket_available(self._wiz_sock, _SNMR_UDP) > 236: + bytes_count, bytes_read = self._eth.read_udp( + self._wiz_sock, _BUFF_LENGTH + ) + _BUFF[:bytes_count] = bytes_read + debug_msg("Received {} bytes".format(bytes_count), self._debug) + del bytes_read + gc.collect() + return bytes_count + raise TimeoutError("No DHCP response received.") def _process_messaging_states(self, *, message_type: int): """ diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py index ad96cfa..f014c68 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py @@ -261,7 +261,7 @@ def gethostbyname(self, hostname: bytes) -> Union[int, bytes]: ipaddress = -1 for _ in range(5): # wait for a response - socket_timeout = time.monotonic() + 1.0 + socket_timeout = time.monotonic() + 5.0 while not self._iface.socket_available(dns_socket, 0x02): if time.monotonic() > socket_timeout: _debug_print( diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py index 73e971c..650fd1f 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py @@ -254,14 +254,14 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: wiznet5k.adafruit_wiznet5k.SNIR_TIMEOUT | wiznet5k.adafruit_wiznet5k.SNIR_DISCON ) - while not _the_interface.read_snir(self._socknum)[0] & mask: + while not _the_interface.read_snir(self._socknum) & mask: pass _the_interface.write_snir( self._socknum, 0xFF ) # Reset socket interrupt register. _the_interface.socket_close(self._socknum) while ( - _the_interface.socket_status(self._socknum)[0] + _the_interface.socket_status(self._socknum) != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED ): pass @@ -284,7 +284,7 @@ def _status(self) -> int: :return int: Status of the socket. """ - return _the_interface.socket_status(self._socknum)[0] + return _the_interface.socket_status(self._socknum) @property def _connected(self) -> bool: @@ -297,7 +297,7 @@ def _connected(self) -> bool: if self._socknum >= _the_interface.max_sockets: return False - status = _the_interface.socket_status(self._socknum)[0] + status = _the_interface.socket_status(self._socknum) if ( status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT and self._available() == 0 diff --git a/tests/dhcp_dummy_data.py b/tests/dhcp_dummy_data.py deleted file mode 100644 index be2299a..0000000 --- a/tests/dhcp_dummy_data.py +++ /dev/null @@ -1,124 +0,0 @@ -# SPDX-FileCopyrightText: 2022 Martin Stephens -# -# SPDX-License-Identifier: MIT -"""Data for use in test_dhcp_helper_files.py""" - - -def _pad_message(message_section: bytearray, target_length: int) -> bytearray: - """Pad the message with 0x00.""" - return message_section + bytearray(b"\00" * (target_length - len(message_section))) - - -def _build_message(message_body: bytearray, message_options: bytearray) -> bytearray: - """Assemble the padded message and body to make a 318 byte packet. The 'header' - section must be 236 bytes and the entire message must be 318 bytes.""" - dhcp_message = _pad_message(message_body, 236) + _pad_message(message_options, 276) - assert len(dhcp_message) == 512 - return dhcp_message - - -# Data for testing send data. -# DHCP DISCOVER messages. -# Default settings (DISCOVER, broadcast=False, default hostname, renew=False) -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00\x17\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x05\x06\x07" - b"\x08\t\x00\x00\x00\x00\x00\x00\x00\x00" -) -options = bytearray( - b"c\x82Sc5\x01\x01\x0c\x12WIZnet040506070809=\x07\x01" - b"\x04\x05\x06\x07\x08\t7\x03\x01\x03\x063\x04\x00v\xa7\x00\xff" -) -DHCP_SEND_01 = _build_message(message, options) - -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00\x17\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x05\x06\x07" - b"\x08\t" -) -options = bytearray( - b"c\x82Sc5\x01\x01\x0c\x12WIZnet040506070809=\x07\x01" - b"\x04\x05\x06\x07\x08\t7\x03\x01\x03\x063\x04\x00v\xa7\x00\xff" -) -DHCP_SEND_02 = _build_message(message, options) - -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00#\x80\x00\xc0\xa8\x03\x04" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x18#.9DO" -) -options = bytearray( - b"c\x82Sc5\x01\x01\x0c\x04bert=\x07\x01\x18#.9DO7" - b"\x03\x01\x03\x063\x04\x00v\xa7\x00\xff" -) -DHCP_SEND_03 = _build_message(message, options) - -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00#\x80\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xffa$e*c" -) -options = bytearray( - b"c\x82Sc5\x01\x01\x0c\x05clash=\x07\x01\xffa$e*c7" - b"\x03\x01\x03\x063\x04\x00v\xa7\x00\xff" -) -DHCP_SEND_04 = _build_message(message, options) - -# DHCP REQUEST messages. -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00\x10\x80\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xffa$e*c" -) - -options = bytearray( - b"c\x82Sc5\x01\x03\x0c\nhelicopter=\x07\x01\xffa$e*c7" - b"\x03\x01\x03\x063\x04\x00v\xa7\x002\x04\n\n\n+6\x04\x91B-\x16\xff" -) -DHCP_SEND_05 = _build_message(message, options) - -message = bytearray( - b"\x01\x01\x06\x00o\xff\xff\xff\x00H\x80\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00K?\xa6\x04" - b"\xc8e" -) - -options = bytearray( - b"c\x82Sc5\x01\x03\x0c\x12WIZnet4B3FA604C865=\x07\x01K?\xa6" - b"\x04\xc8e7\x03\x01\x03\x063\x04\x00v\xa7\x002\x04def\x046" - b"\x04\xf5\xa6\x05\x0b\xff" -) -DHCP_SEND_06 = _build_message(message, options) - -# Data to test response parser. -# Basic case, no extra fields, one each of router and DNS. -message = bytearray( - b"\x02\x00\x00\x00\x7f\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xc0" - b"\xa8\x05\x16\x00\x00\x00\x00\x00\x00\x00\x00\x01\x03\x05\x07\t\x0b" -) - -options = bytearray( - b"c\x82Sc5\x01\x02\x01\x04\xc0\xa8\x06\x026\x04\xeao\xde" - b"{3\x04\x00\x01\x01\x00\x03\x04yy\x04\x05\x06\x04\x05\x06" - b'\x07\x08:\x04\x00""\x00;\x04\x0033\x00\xff' -) -GOOD_DATA_01 = _build_message(message, options) - -# Complex case, extra field, 2 routers and 2 DNS servers. -message = bytearray( - b"\x02\x00\x00\x004Vx\x9a\x00\x00\x00\x00\x00\x00\x00\x00\x12$@\n\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x01" -) -options = bytearray( - b"c\x82Sc5\x01\x05<\x05\x01\x02\x03\x04\x05\x01\x04\n\x0b" - b"\x07\xde6\x04zN\x91\x03\x03\x08\n\x0b\x0e\x0f\xff\x00" - b"\xff\x00\x06\x08\x13\x11\x0b\x07****3\x04\x00\x00=;:\x04" - b"\x00\x0e\x17@;\x04\x02\x92]\xde\xff" -) -GOOD_DATA_02 = _build_message(message, options) - - -# -message = bytearray( - b"\x02\x00\x00\x00\xff\xff\xff\x7f\x00\x00\x00\x00\x00\x00\x00\x00\x12$@\n\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x01" -) -options = bytearray(b"c\x82Sc") -BAD_DATA = _build_message(message, options) diff --git a/tests/extract_unique_dns_responses.py b/tests/extract_unique_dns_responses.py deleted file mode 100644 index f54ed25..0000000 --- a/tests/extract_unique_dns_responses.py +++ /dev/null @@ -1,91 +0,0 @@ -# SPDX-FileCopyrightText: 2022 Martin Stephens -# -# SPDX-License-Identifier: MIT -""" -Extract unique responses from a Wireshark JSON export and write a file that includes -enough information to test the DNS response parser. Also writes a file containing the -domain names for running tests on microcontrollers. - -The wireshark JSON file should contain only DNS queries (port 53) and include the raw -response data. -""" -import json -from typing import List - -READ_FILE_NAME = "wireshark_dns.json" -WRITE_FILE_NAME = "dns_parser_test_data" - -with open(READ_FILE_NAME, "r") as f: - dns_records: List[dict] = json.load(f) -print(f"DNS Records: {len(dns_records)}") - -# Filter out the DNS queries. -responses_only = [] -for dns_record in dns_records: - if ( - dns_record["_source"]["layers"]["dns"]["dns.flags_tree"]["dns.flags.response"] - == "1" - ): - responses_only.append(dns_record) -print(f"DNS Responses: {len(responses_only)}") - -# Filter out the IPv6 responses. -type_a_responses = [] -for response in responses_only: - if "AAAA" not in list(response["_source"]["layers"]["dns"]["Queries"].keys())[0]: - type_a_responses.append(response) -print(f"Type A responses: {len(type_a_responses)}") - -# Extract unique repsonses. -unique_urls = set() -unique_responses = [] -for response in type_a_responses: - query_key = list(response["_source"]["layers"]["dns"]["Queries"].keys())[0] - if ( - response["_source"]["layers"]["dns"]["Queries"][query_key]["dns.qry.name"] - not in unique_urls - ): - unique_urls.add( - response["_source"]["layers"]["dns"]["Queries"][query_key]["dns.qry.name"] - ) - unique_responses.append(response) -print(f"Unique responses: {len(unique_responses)}") - -# Create a dictionary with the required fields. -export_responses = [] -for response in unique_responses: - query_key = list(response["_source"]["layers"]["dns"]["Queries"].keys())[0] - export_response = { - "query_id": response["_source"]["layers"]["dns"]["dns.id"], - "query_name": response["_source"]["layers"]["dns"]["Queries"][query_key][ - "dns.qry.name" - ], - "query_name_length": response["_source"]["layers"]["dns"]["Queries"][query_key][ - "dns.qry.name.len" - ], - } - try: - answer_keys = list(response["_source"]["layers"]["dns"]["Answers"].keys()) - for answer_key in answer_keys: - if "type A" in answer_key: - export_response["answer_IPv4"] = response["_source"]["layers"]["dns"][ - "Answers" - ][answer_key]["dns.a"] - break - except KeyError: - export_response["answer_IPv4"] = None - export_response["udp_packet"] = response["_source"]["layers"]["udp"]["udp.payload"] - export_responses.append(export_response) -print(f"Responses to export: {len(export_responses)}") - -# Write a JSON file for testing the parser on a computer. -print("Writing JSON file…") -with open(f"{WRITE_FILE_NAME}.json", "w") as f: - json.dump(export_responses, f) - -# Write a text file with a domain name on each line for testing on a microcontroller. -print("Writing text file…") -with open(f"{WRITE_FILE_NAME}.txt", "w") as f: - f.writelines([f"{response['query_name']}\n" for response in export_responses]) - -print("Done.") diff --git a/tests/test_dhcp_helper_functions.py b/tests/test_dhcp_helper_functions.py deleted file mode 100644 index 6d5f4be..0000000 --- a/tests/test_dhcp_helper_functions.py +++ /dev/null @@ -1,800 +0,0 @@ -# SPDX-FileCopyrightText: 2022 Martin Stephens -# -# SPDX-License-Identifier: MIT -"""Tests to confirm that there are no changes in behaviour to methods and functions. -These test are not exhaustive, but are a sanity check while making changes to the module.""" -import time - -# pylint: disable=no-self-use, redefined-outer-name, protected-access, invalid-name, too-many-arguments -import pytest -from freezegun import freeze_time - -# from micropython import const -import dhcp_dummy_data as dhcp_data -import adafruit_wiznet5k.adafruit_wiznet5k_dhcp as wiz_dhcp - - -@pytest.fixture -def mock_wiznet5k(mocker): - """Mock WIZNET5K so that the DHCP class can be tested without hardware.""" - return mocker.patch("adafruit_wiznet5k.adafruit_wiznet5k.WIZNET5K", autospec=True) - - -@pytest.fixture -def mock_dhcp(mock_wiznet5k): - dhcp = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - return dhcp - - -class TestDHCPInit: - def test_constants(self): - """Test all the constants in the DHCP module.""" - - @pytest.mark.parametrize( - "mac_address", - ( - bytes((1, 2, 3, 4, 5, 6)), - bytes((7, 8, 9, 10, 11, 12)), - bytes((1, 2, 4, 6, 7, 8)), - ), - ) - def test_dhcp_setup_default(self, mocker, mock_wiznet5k, mac_address): - """Test intial settings from DHCP.__init__.""" - # Test with mac address as tuple, list and bytes with default values. - mock_randint = mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_dhcp.randint", autospec=True - ) - mock_randint.return_value = 0x1234567 - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, mac_address) - assert dhcp_client._eth == mock_wiznet5k - assert dhcp_client._debug is False - assert dhcp_client._mac_address == mac_address - assert dhcp_client._wiz_sock is None - assert dhcp_client._dhcp_state == wiz_dhcp._STATE_INIT - mock_randint.assert_called_once() - assert dhcp_client._transaction_id == 0x1234567 - assert dhcp_client._start_time == 0 - assert dhcp_client.dhcp_server_ip == wiz_dhcp._BROADCAST_SERVER_ADDR - assert dhcp_client.local_ip == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client.gateway_ip == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client.subnet_mask == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client.dns_server_ip == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client._lease == 0 - assert dhcp_client._t1 == 0 - assert dhcp_client._t2 == 0 - mac_string = "".join("{:02X}".format(o) for o in mac_address) - assert dhcp_client._hostname == bytes( - "WIZnet{}".split(".", maxsplit=1)[0].format(mac_string)[:42], "utf-8" - ) - - def test_dhcp_setup_other_args(self, mock_wiznet5k): - """Test instantiating DHCP with none default values.""" - mac_address = bytes((7, 8, 9, 10, 11, 12)) - dhcp_client = wiz_dhcp.DHCP( - mock_wiznet5k, - mac_address, - hostname="fred.com", - debug=True, - ) - - assert dhcp_client._debug is True - mac_string = "".join("{:02X}".format(o) for o in mac_address) - assert dhcp_client._hostname == bytes( - "fred.com".split(".", maxsplit=1)[0].format(mac_string)[:42], "utf-8" - ) - - @pytest.mark.parametrize( - "mac_address, error_type", - ( - ("fdsafa", TypeError), - ((1, 2, 3, 4, 5, 6), TypeError), - (b"12345", ValueError), - (b"1234567", ValueError), - ), - ) - def test_mac_address_checking(self, mock_wiznet5k, mac_address, error_type): - with pytest.raises(error_type): - wiz_dhcp.DHCP( - mock_wiznet5k, - mac_address, - hostname="fred.com", - debug=True, - ) - - -@freeze_time("2022-10-20") -class TestSendDHCPMessage: - def test_generate_message_with_default_attributes(self, mock_wiznet5k): - """Test the _generate_message function with default values.""" - assert len(wiz_dhcp._BUFF) == 512 - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((4, 5, 6, 7, 8, 9))) - dhcp_client._transaction_id = 0x6FFFFFFF - dhcp_client._start_time = time.monotonic() - 23.4 - dhcp_client._generate_dhcp_message(message_type=wiz_dhcp._DHCP_DISCOVER) - assert wiz_dhcp._BUFF == dhcp_data.DHCP_SEND_01 - assert len(wiz_dhcp._BUFF) == 512 - - @pytest.mark.parametrize( - "mac_address, hostname, msg_type, time_elapsed, renew, \ - broadcast_only, local_ip, server_ip, result", - ( - ( - bytes((4, 5, 6, 7, 8, 9)), - None, - wiz_dhcp._DHCP_DISCOVER, - 23.4, - False, - False, - b"\x00\x00\x00\x00", - b"\x00\x00\x00\x00", - dhcp_data.DHCP_SEND_02, - ), - ( - bytes((24, 35, 46, 57, 68, 79)), - "bert.co.uk", - wiz_dhcp._DHCP_DISCOVER, - 35.5, - True, - True, - b"\xc0\xa8\x03\x04", - b"\xe0\x7b\x17\x0a", - dhcp_data.DHCP_SEND_03, - ), - ( - bytes((255, 97, 36, 101, 42, 99)), - "clash.net", - wiz_dhcp._DHCP_DISCOVER, - 35.5, - False, - True, - b"\x0a\x0a\x0a\x2b", - b"\x91\x42\x2d\x16", - dhcp_data.DHCP_SEND_04, - ), - ), - ) - def test_generate_dhcp_message_discover_with_non_defaults( - self, - mock_wiznet5k, - mac_address, - hostname, - msg_type, - time_elapsed, - renew, - broadcast_only, - local_ip, - server_ip, - result, - ): - """Test the generate_dhcp_message function with different message types and - none default attributes.""" - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, mac_address, hostname=hostname) - # Set client attributes for test - dhcp_client.local_ip = local_ip - dhcp_client.dhcp_server_ip = server_ip - dhcp_client._transaction_id = 0x6FFFFFFF - dhcp_client._start_time = time.monotonic() - time_elapsed - dhcp_client._renew = renew - # Test - dhcp_client._generate_dhcp_message( - message_type=msg_type, - broadcast=broadcast_only, - ) - assert len(wiz_dhcp._BUFF) == 512 - assert wiz_dhcp._BUFF == result - - @pytest.mark.parametrize( - "mac_address, hostname, msg_type, time_elapsed, \ - broadcast_only, local_ip, server_ip, result", - ( - ( - bytes((255, 97, 36, 101, 42, 99)), - "helicopter.org", - wiz_dhcp._DHCP_REQUEST, - 16.3, - True, - bytes((10, 10, 10, 43)), - bytes((145, 66, 45, 22)), - dhcp_data.DHCP_SEND_05, - ), - ( - bytes((75, 63, 166, 4, 200, 101)), - None, - wiz_dhcp._DHCP_REQUEST, - 72.4, - True, - bytes((100, 101, 102, 4)), - bytes((245, 166, 5, 11)), - dhcp_data.DHCP_SEND_06, - ), - ), - ) - def test_generate_dhcp_message_with_request_options( - self, - mock_wiznet5k, - mac_address, - hostname, - msg_type, - time_elapsed, - broadcast_only, - local_ip, - server_ip, - result, - ): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, mac_address, hostname=hostname) - # Set client attributes for test - dhcp_client.local_ip = local_ip - dhcp_client.dhcp_server_ip = server_ip - dhcp_client._transaction_id = 0x6FFFFFFF - dhcp_client._start_time = time.monotonic() - time_elapsed - # Test - dhcp_client._generate_dhcp_message( - message_type=msg_type, broadcast=broadcast_only - ) - assert len(wiz_dhcp._BUFF) == 512 - assert wiz_dhcp._BUFF == result - - -class TestParseDhcpMessage: - @pytest.mark.parametrize( - "xid, local_ip, msg_type, subnet, dhcp_ip, gate_ip, dns_ip, lease, t1, t2, response", - ( - ( - 0x7FFFFFFF, - b"\xc0\xa8\x05\x16", - 2, - b"\xc0\xa8\x06\x02", - b"\xeao\xde{", - b"yy\x04\x05", - b"\x05\x06\x07\x08", - 65792, - 2236928, - 3355392, - dhcp_data.GOOD_DATA_01, - ), - ( - 0x3456789A, - b"\x12$@\n", - 5, - b"\n\x0b\x07\xde", - b"zN\x91\x03", - b"\n\x0b\x0e\x0f", - b"\x13\x11\x0b\x07", - 15675, - 923456, - 43146718, - dhcp_data.GOOD_DATA_02, - ), - ), - ) - # pylint: disable=too-many-locals - def test_parse_good_data( - self, - mock_wiznet5k, - xid, - local_ip, - msg_type, - subnet, - dhcp_ip, - gate_ip, - dns_ip, - lease, - t1, - t2, - response, - ): - wiz_dhcp._BUFF = response - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - dhcp_client._transaction_id = xid - response_type = dhcp_client._parse_dhcp_response() - assert response_type == msg_type - assert dhcp_client.local_ip == local_ip - assert dhcp_client.subnet_mask == subnet - assert dhcp_client.dhcp_server_ip == dhcp_ip - assert dhcp_client.gateway_ip == gate_ip - assert dhcp_client.dns_server_ip == dns_ip - assert dhcp_client._lease == lease - assert dhcp_client._t1 == t1 - assert dhcp_client._t2 == t2 - - def test_parsing_failures(self, mock_wiznet5k): - # Test for bad OP code, ID mismatch, no server ID, bad Magic Cookie - bad_data = dhcp_data.BAD_DATA - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - dhcp_client._eth._read_socket.return_value = (len(bad_data), bad_data) - # Transaction ID mismatch. - dhcp_client._transaction_id = 0x42424242 - with pytest.raises(ValueError): - dhcp_client._parse_dhcp_response() - # Bad OP code. - bad_data[0] = 0 - dhcp_client._transaction_id = 0x7FFFFFFF - with pytest.raises(ValueError): - dhcp_client._parse_dhcp_response() - bad_data[0] = 2 # Reset to good value - # No server ID. - bad_data[28:34] = (0, 0, 0, 0, 0, 0) - with pytest.raises(ValueError): - dhcp_client._parse_dhcp_response() - bad_data[28:34] = (1, 1, 1, 1, 1, 1) # Reset to a good value for next test. - # Bad Magic Cookie. - bad_data[236] = 0 - with pytest.raises(ValueError): - dhcp_client._parse_dhcp_response() - - -@freeze_time("2022-11-10") -def test_dsm_reset(mocker, mock_wiznet5k): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - mocker.patch.object(dhcp_client, "_dhcp_connection_setup", autospec=True) - mocker.patch.object(dhcp_client, "_socket_release", autospec=True) - dhcp_client.dhcp_server_ip = bytes((1, 2, 3, 4)) - dhcp_client.local_ip = bytes((2, 3, 4, 5)) - dhcp_client.subnet_mask = bytes((3, 4, 5, 6)) - dhcp_client.dns_server_ip = bytes((7, 8, 8, 10)) - dhcp_client._renew = True - dhcp_client._retries = 4 - dhcp_client._transaction_id = 3 - dhcp_client._start_time = None - - dhcp_client._dsm_reset() - dhcp_client._dhcp_connection_setup.assert_called_once() - dhcp_client._socket_release.assert_called_once() - assert mock_wiznet5k.ifconfig == ( - wiz_dhcp._UNASSIGNED_IP_ADDR, - wiz_dhcp._UNASSIGNED_IP_ADDR, - wiz_dhcp._UNASSIGNED_IP_ADDR, - wiz_dhcp._UNASSIGNED_IP_ADDR, - ) - assert dhcp_client.dhcp_server_ip == wiz_dhcp._BROADCAST_SERVER_ADDR - assert dhcp_client.local_ip == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client.subnet_mask == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client.dns_server_ip == wiz_dhcp._UNASSIGNED_IP_ADDR - assert dhcp_client._renew is None - assert dhcp_client._transaction_id == 4 - assert dhcp_client._start_time == time.monotonic() - - -class TestSocketRelease: - def test_socket_set_to_none(self, mock_wiznet5k): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - dhcp_client._socket_release() - assert dhcp_client._wiz_sock is None - - dhcp_client._wiz_sock = 2 - dhcp_client._socket_release() - assert dhcp_client._wiz_sock is None - - -class TestSmallHelperFunctions: - def test_increment_transaction_id(self, mock_wiznet5k): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Test that transaction_id increments. - dhcp_client._transaction_id = 4 - dhcp_client._increment_transaction_id() - assert dhcp_client._transaction_id == 5 - # Test that transaction_id rolls over from 0x7fffffff to zero - dhcp_client._transaction_id = 0x7FFFFFFF - dhcp_client._increment_transaction_id() - assert dhcp_client._transaction_id == 0 - - @freeze_time("2022-10-10") - @pytest.mark.parametrize("rand_int", (-1, 0, 1)) - def test_next_retry_time_default_attrs(self, mocker, mock_wiznet5k, rand_int): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_dhcp.randint", - autospec=True, - return_value=rand_int, - ) - now = time.monotonic() - for retry in range(3): - assert dhcp_client._next_retry_time(attempt=retry) == int( - 2**retry * 4 + rand_int + now - ) - - @freeze_time("2022-10-10") - @pytest.mark.parametrize("interval", (2, 7, 10)) - def test_next_retry_time_optional_attrs(self, mocker, mock_wiznet5k, interval): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_dhcp.randint", - autospec=True, - return_value=0, - ) - now = time.monotonic() - for retry in range(3): - assert dhcp_client._next_retry_time( - attempt=retry, interval=interval - ) == int(2**retry * interval + now) - - @freeze_time("2022-7-6") - def test_setup_socket_with_no_error(self, mocker, mock_wiznet5k): - mocker.patch.object(mock_wiznet5k, "get_socket", return_value=2) - mocker.patch.object(mock_wiznet5k, "read_sncr", return_value=b"\x00") - mocker.patch.object(mock_wiznet5k, "read_snsr", return_value=b"\x22") - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - dhcp_client._dhcp_connection_setup() - mock_wiznet5k.get_socket.assert_called_once() - mock_wiznet5k.write_snmr.assert_called_once_with(2, 0x02) - mock_wiznet5k.write_sock_port(2, 68) - mock_wiznet5k.write_sncr(2, 0x01) - mock_wiznet5k.read_sncr.assert_called_with(2) - mock_wiznet5k.write_sndport.assert_called_once_with( - 2, wiz_dhcp._DHCP_SERVER_PORT - ) - assert dhcp_client._wiz_sock == 2 - - @freeze_time("2022-7-6", auto_tick_seconds=2) - def test_setup_socket_with_timeout_on_get_socket(self, mocker, mock_wiznet5k): - mocker.patch.object(mock_wiznet5k, "get_socket", return_value=0xFF) - mocker.patch.object(mock_wiznet5k, "read_sncr", return_value=b"\x00") - mocker.patch.object(mock_wiznet5k, "read_snsr", return_value=b"\x22") - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - with pytest.raises(RuntimeError): - dhcp_client._dhcp_connection_setup() - assert dhcp_client._wiz_sock is None - - @freeze_time("2022-7-6", auto_tick_seconds=2) - def test_setup_socket_with_timeout_on_socket_is_udp(self, mocker, mock_wiznet5k): - mocker.patch.object(mock_wiznet5k, "get_socket", return_value=2) - mocker.patch.object(mock_wiznet5k, "read_sncr", return_value=b"\x00") - mocker.patch.object(mock_wiznet5k, "read_snsr", return_value=b"\x21") - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - with pytest.raises(RuntimeError): - dhcp_client._dhcp_connection_setup() - assert dhcp_client._wiz_sock is None - - -class TestHandleDhcpMessage: - @pytest.mark.parametrize( - "fsm_state, msg_in", - ( - (wiz_dhcp._STATE_SELECTING, wiz_dhcp._DHCP_DISCOVER), - (wiz_dhcp._STATE_REQUESTING, wiz_dhcp._DHCP_REQUEST), - ), - ) - @freeze_time("2022-5-5") - def test_good_data(self, mocker, mock_wiznet5k, fsm_state, msg_in): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Mock out methods to allow _handle_dhcp_message to run. - mocker.patch.object( - dhcp_client, "_generate_dhcp_message", autospec=True, return_value=300 - ) - mocker.patch.object(dhcp_client, "_process_messaging_states", autospec=True) - mocker.patch.object( - dhcp_client, "_receive_dhcp_response", autospec=True, return_value=300 - ) - # Non zero value is a good message for _handle_dhcp_message. - mocker.patch.object( - dhcp_client, "_parse_dhcp_response", autospec=True, return_value=0x01 - ) - mocker.patch.object( - dhcp_client, - "_next_retry_time", - autospec=True, - return_value=time.monotonic() + 5, - ) - # Set initial FSM state. - dhcp_client._wiz_sock = 3 - dhcp_client._dhcp_state = fsm_state - dhcp_client._blocking = True - dhcp_client._renew = False - # Test. - assert dhcp_client._handle_dhcp_message() == 1 - # Confirm that the msg_type sent matches the FSM state. - dhcp_client._generate_dhcp_message.assert_called_once_with(message_type=msg_in) - dhcp_client._eth.write_sndipr.assert_called_once_with( - 3, dhcp_client.dhcp_server_ip - ) - dhcp_client._eth.write_sndport.assert_called_once_with( - dhcp_client._wiz_sock, wiz_dhcp._DHCP_SERVER_PORT - ) - dhcp_client._eth.socket_write.assert_called_once_with(3, wiz_dhcp._BUFF[:300]) - dhcp_client._next_retry_time.assert_called_once_with(attempt=0) - dhcp_client._receive_dhcp_response.assert_called_once_with(time.monotonic() + 5) - # If the initial message was good, receive is only called once. - dhcp_client._parse_dhcp_response.assert_called_once() - - @freeze_time("2022-5-5", auto_tick_seconds=1) - def test_timeout_blocking_no_response(self, mocker, mock_wiznet5k): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Mock out methods to allow _handle_dhcp_message to run. - mocker.patch.object( - dhcp_client, "_generate_dhcp_message", autospec=True, return_value=300 - ) - mocker.patch.object(dhcp_client, "_process_messaging_states", autospec=True) - # No message bytes returned, so the handler should loop. - mocker.patch.object( - dhcp_client, "_receive_dhcp_response", autospec=True, return_value=0 - ) - mocker.patch.object( - dhcp_client, "_parse_dhcp_response", autospec=True, side_effect=[ValueError] - ) - mocker.patch.object( - dhcp_client, - "_next_retry_time", - autospec=True, - return_value=time.monotonic() + 5, - ) - # Set initial FSM state. - dhcp_client._wiz_sock = 3 - dhcp_client._dhcp_state = wiz_dhcp._STATE_REQUESTING - dhcp_client._blocking = True - dhcp_client._renew = False - # Test that a TimeoutError is raised. - with pytest.raises(TimeoutError): - dhcp_client._handle_dhcp_message() - # Confirm that _receive_dhcp_response is called repeatedly. - assert dhcp_client._receive_dhcp_response.call_count == 4 - # Check that message parsing not called. - dhcp_client._parse_dhcp_response.assert_not_called() - - @freeze_time("2022-5-5", auto_tick_seconds=1) - def test_timeout_blocking_bad_message(self, mocker, mock_wiznet5k): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Mock out methods to allow _handle_dhcp_message to run. - mocker.patch.object( - dhcp_client, "_generate_dhcp_message", autospec=True, return_value=300 - ) - # Return False to model a bad message type, which should loop. - mocker.patch.object( - dhcp_client, "_process_messaging_states", autospec=True, return_value=False - ) - mocker.patch.object( - dhcp_client, "_receive_dhcp_response", autospec=True, return_value=300 - ) - mocker.patch.object( - dhcp_client, "_parse_dhcp_response", autospec=True, side_effect=ValueError - ) - mocker.patch.object( - dhcp_client, - "_next_retry_time", - autospec=True, - return_value=time.monotonic() + 5, - ) - # Set initial FSM state. - dhcp_client._wiz_sock = 3 - dhcp_client._dhcp_state = wiz_dhcp._STATE_REQUESTING - dhcp_client._blocking = True - dhcp_client._renew = False - # Test that a TimeoutError is raised. - with pytest.raises(TimeoutError): - dhcp_client._handle_dhcp_message() - # Confirm that processing methods are called repeatedly. - assert dhcp_client._receive_dhcp_response.call_count == 4 - assert dhcp_client._parse_dhcp_response.call_count == 4 - - @freeze_time("2022-5-5") - @pytest.mark.parametrize( - "renew, blocking", (("renew", False), ("renew", True), (None, False)) - ) - def test_no_response_non_blocking_renewing( - self, mocker, mock_wiznet5k, renew, blocking - ): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Mock out methods to allow _handle_dhcp_message to run. - mocker.patch.object( - dhcp_client, "_generate_dhcp_message", autospec=True, return_value=300 - ) - mocker.patch.object(dhcp_client, "_process_messaging_states", autospec=True) - # No message bytes returned, so the handler do nothing and return. - mocker.patch.object( - dhcp_client, "_receive_dhcp_response", autospec=True, return_value=0 - ) - mocker.patch.object( - dhcp_client, "_parse_dhcp_response", autospec=True, return_value=0x00 - ) - mocker.patch.object( - dhcp_client, - "_next_retry_time", - autospec=True, - return_value=time.monotonic() + 5, - ) - # Set initial FSM state. - dhcp_client._wiz_sock = 3 - dhcp_client._dhcp_state = wiz_dhcp._STATE_REQUESTING - # Combinations of renew and blocking that will not loop. - dhcp_client._blocking = blocking - dhcp_client._renew = renew - # Test. - assert dhcp_client._handle_dhcp_message() == 0 - dhcp_client._next_retry_time.assert_called_once_with(attempt=0) - dhcp_client._receive_dhcp_response.assert_called_once_with(time.monotonic() + 5) - # No bytes returned so don't call parse or process message. - dhcp_client._parse_dhcp_response.assert_not_called() - - @freeze_time("2022-5-5") - @pytest.mark.parametrize( - "renew, blocking", (("renew", False), ("renew", True), (None, False)) - ) - def test_bad_message_non_blocking_renewing( - self, mocker, mock_wiznet5k, renew, blocking - ): - dhcp_client = wiz_dhcp.DHCP(mock_wiznet5k, bytes((1, 2, 3, 4, 5, 6))) - # Mock out methods to allow _handle_dhcp_message to run. - mocker.patch.object( - dhcp_client, "_generate_dhcp_message", autospec=True, return_value=300 - ) - # Bad message so check that the handler does not loop. - mocker.patch.object(dhcp_client, "_process_messaging_states", autospec=False) - mocker.patch.object( - dhcp_client, "_receive_dhcp_response", autospec=True, return_value=300 - ) - mocker.patch.object( - dhcp_client, "_parse_dhcp_response", autospec=True, side_effect=ValueError - ) - mocker.patch.object( - dhcp_client, - "_next_retry_time", - autospec=True, - return_value=time.monotonic() + 5, - ) - # Set initial FSM state. - dhcp_client._wiz_sock = 3 - dhcp_client._dhcp_state = wiz_dhcp._STATE_REQUESTING - # Combinations of renew and blocking that will not loop. - dhcp_client._blocking = blocking - dhcp_client._renew = renew - # Test. - assert dhcp_client._handle_dhcp_message() == 0 - dhcp_client._next_retry_time.assert_called_once_with(attempt=0) - dhcp_client._receive_dhcp_response.assert_called_once_with(time.monotonic() + 5) - # Bad message returned so call parse and process message. - dhcp_client._parse_dhcp_response.assert_called_once() - - -class TestReceiveResponse: - minimum_packet_length = 236 - - @freeze_time("2022-10-10") - @pytest.mark.parametrize( - "bytes_on_socket", (wiz_dhcp._BUFF_LENGTH, minimum_packet_length + 1) - ) - def test_receive_response_good_data(self, mock_dhcp, bytes_on_socket): - mock_dhcp._eth.read_udp.return_value = ( - bytes_on_socket, - bytes([0] * bytes_on_socket), - ) - response = mock_dhcp._receive_dhcp_response(time.monotonic() + 15) - assert response == bytes_on_socket - assert response > 236 - - @pytest.mark.skip - @freeze_time("2022-10-10") - def test_receive_response_short_packet(self, mock_dhcp): - mock_dhcp._eth.read_udp.side_effect = [ - (236, bytes([0] * 236)), - (1, bytes([0] * 1)), - ] - assert mock_dhcp._receive_dhcp_response(time.monotonic() + 15) > 236 - - @freeze_time("2022-10-10", auto_tick_seconds=5) - def test_timeout(self, mock_dhcp): - mock_dhcp._next_resend = time.monotonic() + 15 - mock_dhcp._eth.read_udp.side_effect = [ - (0, b""), - (0, b""), - (0, b""), - (0, b""), - (0, b""), - bytes([0] * 240), - ] - assert mock_dhcp._receive_dhcp_response(time.monotonic() + 15) == 0 - - @freeze_time("2022-10-10") - @pytest.mark.parametrize("bytes_returned", ([240], [230, 30])) - def test_buffer_handling(self, mock_dhcp, bytes_returned): - total_bytes = sum(bytes_returned) - mock_dhcp._next_resend = time.monotonic() + 15 - wiz_dhcp._BUFF = bytearray([1] * wiz_dhcp._BUFF_LENGTH) - expected_result = bytearray([2] * total_bytes) + ( - bytes([0] * (wiz_dhcp._BUFF_LENGTH - total_bytes)) - ) - mock_dhcp._eth.read_udp.side_effect = ( - (x, bytes([2] * x)) for x in bytes_returned - ) - assert mock_dhcp._receive_dhcp_response(time.monotonic() + 15) == total_bytes - assert wiz_dhcp._BUFF == expected_result - - @freeze_time("2022-10-10") - def test_buffer_does_not_overrun(self, mocker, mock_dhcp): - mock_dhcp._wiz_sock = 1 - mock_dhcp._next_resend = time.monotonic() + 15 - mock_dhcp._eth.read_udp.return_value = ( - wiz_dhcp._BUFF_LENGTH, - bytes([2] * wiz_dhcp._BUFF_LENGTH), - ) - mock_dhcp._receive_dhcp_response(time.monotonic() + 10) - mock_dhcp._eth.read_udp.assert_called_once_with(1, wiz_dhcp._BUFF_LENGTH) - mock_dhcp._eth.read_udp.reset_mock() - mock_dhcp._eth.read_udp.side_effect = [ - (200, bytes([2] * 200)), - (118, bytes([2] * 118)), - ] - mock_dhcp._receive_dhcp_response(time.monotonic() + 10) - assert mock_dhcp._eth.read_udp.call_count == 2 - assert mock_dhcp._eth.read_udp.call_args_list == [ - mocker.call(1, 512), - mocker.call(1, 312), - ] - - -class TestProcessMessagingStates: - @pytest.mark.parametrize( - "state, bad_messages", - ( - ( - wiz_dhcp._STATE_SELECTING, - ( - 0, - wiz_dhcp._DHCP_ACK, - wiz_dhcp._DHCP_REQUEST, - wiz_dhcp._DHCP_DECLINE, - wiz_dhcp._DHCP_DISCOVER, - wiz_dhcp._DHCP_NAK, - wiz_dhcp._DHCP_INFORM, - wiz_dhcp._DHCP_RELEASE, - ), - ), - ( - wiz_dhcp._STATE_REQUESTING, - ( - 0, - wiz_dhcp._DHCP_OFFER, - wiz_dhcp._DHCP_REQUEST, - wiz_dhcp._DHCP_DECLINE, - wiz_dhcp._DHCP_DISCOVER, - wiz_dhcp._DHCP_INFORM, - wiz_dhcp._DHCP_RELEASE, - ), - ), - ), - ) - def test_called_with_bad_or_no_message(self, mock_dhcp, state, bad_messages): - # Setup with the current state. - mock_dhcp._dhcp_state = state - # Test against 0 (no message) and all bad message types. - for message_type in bad_messages: - # Test. - mock_dhcp._process_messaging_states(message_type=message_type) - # Confirm that a 0 message does not change state. - assert mock_dhcp._dhcp_state == state - - def test_called_from_selecting_good_message(self, mock_dhcp): - # Setup with the required state. - mock_dhcp._dhcp_state = wiz_dhcp._STATE_SELECTING - # Test. - mock_dhcp._process_messaging_states(message_type=wiz_dhcp._DHCP_OFFER) - # Confirm correct new state. - assert mock_dhcp._dhcp_state == wiz_dhcp._STATE_REQUESTING - - @freeze_time("2022-3-4") - @pytest.mark.parametrize("lease_time", (200, 8000)) - def test_called_from_requesting_with_ack(self, mock_dhcp, lease_time): - # Setup with the required state. - mock_dhcp._dhcp_state = wiz_dhcp._STATE_REQUESTING - # Set the lease_time (zero forces a default to be used). - mock_dhcp._lease = lease_time - # Set renew to "renew" to confirm that an ACK sets it to None. - mock_dhcp._renew = "renew" - # Set a start time. - mock_dhcp._start_time = time.monotonic() - # Test. - mock_dhcp._process_messaging_states(message_type=wiz_dhcp._DHCP_ACK) - # Confirm timers are correctly set. - assert mock_dhcp._t1 == time.monotonic() + lease_time // 2 - assert mock_dhcp._t2 == time.monotonic() + lease_time - lease_time // 8 - assert mock_dhcp._lease == time.monotonic() + lease_time - # Check that renew is forced to None - assert mock_dhcp._renew is None - # FSM state should be bound. - assert mock_dhcp._dhcp_state == wiz_dhcp._STATE_BOUND - - def test_called_from_requesting_with_nak(self, mock_dhcp): - # Setup with the required state. - mock_dhcp._dhcp_state = wiz_dhcp._STATE_REQUESTING - # Test. - mock_dhcp._process_messaging_states(message_type=wiz_dhcp._DHCP_NAK) - # FSM state should be init after receiving a NAK response. - assert mock_dhcp._dhcp_state == wiz_dhcp._STATE_INIT diff --git a/tests/test_dns_server_nonbreaking_changes.py b/tests/test_dns_server_nonbreaking_changes.py deleted file mode 100644 index 7ea4491..0000000 --- a/tests/test_dns_server_nonbreaking_changes.py +++ /dev/null @@ -1,214 +0,0 @@ -# SPDX-FileCopyrightText: 2022 Martin Stephens -# -# SPDX-License-Identifier: MIT - -# pylint: disable=no-self-use, redefined-outer-name, protected-access, invalid-name, too-many-arguments -"""Tests to confirm that there are no changes in behaviour to public methods and funtions.""" -import pytest -import freezegun -from micropython import const -import adafruit_wiznet5k.adafruit_wiznet5k_dns as wiz_dns - - -DEFAULT_DEBUG_ON = True - - -@pytest.fixture -def wiznet(mocker): - return mocker.patch("adafruit_wiznet5k.adafruit_wiznet5k.WIZNET5K", autospec=True) - - -@pytest.fixture -def wrench(mocker): - return mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_socket.socket", autospec=True - ) - - -class TestDNSInit: - def test_constants(self): - assert wiz_dns._QUERY_FLAG == const(0x00) - assert wiz_dns._OPCODE_STANDARD_QUERY == const(0x00) - assert wiz_dns._RECURSION_DESIRED_FLAG == 1 << 8 - - assert wiz_dns._TYPE_A == const(0x0001) - assert wiz_dns._CLASS_IN == const(0x0001) - assert wiz_dns._DATA_LEN == const(0x0004) - - # Return codes for gethostbyname - assert wiz_dns._SUCCESS == const(1) - assert wiz_dns._TIMED_OUT == const(-1) - assert wiz_dns._INVALID_SERVER == const(-2) - assert wiz_dns._TRUNCATED == const(-3) - assert wiz_dns._INVALID_RESPONSE == const(-4) - - assert wiz_dns._DNS_PORT == const(0x35) # port used for DNS request - - @pytest.mark.skip - def test_dns_setup_default(self, wiznet, wrench): - # Test with DNS address as string and default values. - dns_server = wiz_dns.DNS(wiznet, "8.8.8.8") - assert dns_server._iface == wiznet - assert dns_server._dns_server == "8.8.8.8" - assert dns_server._debug is False - # assert dns_server._host == b"" - assert dns_server._query_id == 0 - assert dns_server._query_length == 0 - wrench.assert_called_once_with(type=2) - - def test_dns_setup_other_args(self, wiznet): - # Test with DNS address as tuple and debug on. - dns_server = wiz_dns.DNS(wiznet, (1, 2, 3, 4), debug=True) - assert dns_server._dns_server == (1, 2, 3, 4) - assert dns_server._debug is True - # assert dns_server._host == b"" - - -@pytest.mark.skip -class TestDnsGetHostByName: - @pytest.mark.parametrize( - "domain, request_id, dns_bytes_sent, dns_bytes_recv, ipv4", - ( - ( - "www.apple.com", # Response with CNAME and A type answers. - 0x3476, - bytearray( - b"\x34\x76\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x05apple\x03com\x00" - b"\x00\x01\x00\x01" - ), - bytearray( - b"\x34\x76\x81\x80\x00\x01\x00\x04\x00\x00\x00\x00\x03\x77\x77\x77\x05\x61" - b"\x70\x70\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00" - b"\x01\x00\x00\x02\xf3\x00\x1b\x03\x77\x77\x77\x05\x61\x70\x70\x6c\x65\x03" - b"\x63\x6f\x6d\x07\x65\x64\x67\x65\x6b\x65\x79\x03\x6e\x65\x74\x00\xc0\x2b" - b"\x00\x05\x00\x01\x00\x00\x0a\xf2\x00\x2f\x03\x77\x77\x77\x05\x61\x70\x70" - b"\x6c\x65\x03\x63\x6f\x6d\x07\x65\x64\x67\x65\x6b\x65\x79\x03\x6e\x65\x74" - b"\x0b\x67\x6c\x6f\x62\x61\x6c\x72\x65\x64\x69\x72\x06\x61\x6b\x61\x64\x6e" - b"\x73\xc0\x41\xc0\x52\x00\x05\x00\x01\x00\x00\x01\x7d\x00\x18\x05\x65\x36" - b"\x38\x35\x38\x04\x64\x73\x63\x78\x0a\x61\x6b\x61\x6d\x61\x69\x65\x64\x67" - b"\x65\xc0\x41\xc0\x8d\x00\x01\x00\x01\x00\x00\x00\x14\x00\x04\x17\x38\x9c" - b"\x56" - ), - bytearray(b"\x178\x9cV"), - ), - ( - "learn.adafruit.com", # Response with multiple A type answers. - 0x9912, - bytearray( - b"\x99\x12\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x05learn\x08adafruit\x03" - b"com\x00\x00\x01\x00\x01" - ), - bytearray( - b"\x99\x12\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x05\x6c\x65\x61\x72\x6e" - b"\x08\x61\x64\x61\x66\x72\x75\x69\x74\x03\x63\x6f\x6d\x00\x00\x01\x00\x01" - b"\xc0\x0c\x00\x01\x00\x01\x00\x00\x01\x2c\x00\x04\x68\x14\x27\xf0\xc0\x0c" - b"\x00\x01\x00\x01\x00\x00\x01\x2c\x00\x04\x68\x14\x26\xf0" - ), - b"\x68\x14\x27\xf0", - ), - ), - ) - def test_good_domain_names_give_correct_ipv4( - self, - mocker, - wiznet, - wrench, - domain, - request_id, - dns_bytes_sent, - dns_bytes_recv, - ipv4, - ): - """show that the correct IPv4 is returned for a given domain name.""" - # Pylint does not understand that the wrench fixture is required. - # pylint: disable=unused-argument - - # Mock randombits so that the IDs for request and reply match - mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_dns.getrandbits", - return_value=request_id, - ) - # Set up mock server calls. - dns_server = wiz_dns.DNS(wiznet, "8.8.8.8", debug=DEFAULT_DEBUG_ON) - dns_server._sock._available.return_value = len(dns_bytes_recv) - dns_server._sock.recv.return_value = dns_bytes_recv - - # Check that the correct IPv4 address was received. - assert dns_server.gethostbyname(bytes(domain, "utf-8")) == ipv4 - # Check that correct socket calls were made. - dns_server._sock.bind.assert_called_once_with(("", 0x35)) - dns_server._sock.connect.assert_called_once() - dns_server._sock.send.assert_called_once_with(dns_bytes_sent) - - @pytest.mark.parametrize( - "dns_bytes_recv, response, _", - ( - (bytearray(b"\x99\x12\x81\x80\x00\x01\x00"), -1, "Query ID mismatch"), - ( - bytearray(b"\x93\x21\x01\x00\x00\x01\x00"), - -1, - "Query / reply bit not set", - ), - (bytearray(b"\x93\x21\x81\x80\x00\x00\x00"), -1, "Question count != 0"), - (bytearray(b"\x93\x21\x81\x80\x00\x02\x00"), -1, "Question count != 1"), - (bytearray(b"\x93\x21\x81\x80\x00\x02\x00\x00"), -1, "Answer count == 0"), - (bytearray(b"\x93\x21\x81\x80\x00\x02\x00\x00"), -1, "Answer count == 0"), - ), - ) - def test_bad_response_returns_correct_value( - self, mocker, wiznet, wrench, dns_bytes_recv, response, _ - ): - """Show that the correct error code is returned from a bad DNS response.""" - # Pylint does not understand that the wrench fixture is required. - # pylint: disable=unused-argument - - # Mock randombits so that the ID for request is consistent. - mocker.patch( - "adafruit_wiznet5k.adafruit_wiznet5k_dns.getrandbits", - return_value=0x9321, - ) - # Set up mock server calls. - dns_server = wiz_dns.DNS(wiznet, "8.8.8.8", debug=DEFAULT_DEBUG_ON) - dns_server._sock._available.return_value = len(dns_bytes_recv) - dns_server._sock.recv.return_value = dns_bytes_recv - - # Check that the correct response was received. - assert dns_server.gethostbyname(bytes("apple.com", "utf-8")) == response - - # Check that the correct number of calls to _sock.available were made. - dns_server._sock._available.assert_called() - assert len(dns_server._sock._available.call_args_list) == 5 - - @freezegun.freeze_time("2022-3-4", auto_tick_seconds=0.1) - def test_retries_with_no_data_on_socket(self, wiznet, wrench): - """Confirm correct calls made to socket when no data available.""" - # Pylint does not understand that the wrench fixture is required. - # pylint: disable=unused-argument - - dns_server = wiz_dns.DNS(wiznet, "8.8.8.8", debug=DEFAULT_DEBUG_ON) - dns_server._sock._available.return_value = 0 - dns_server._sock.recv.return_value = b"" - dns_server.gethostbyname(bytes("domain.name", "utf-8")) - - # Check how many times the socket was polled for data before giving up. - dns_server._sock._available.assert_called() - assert len(dns_server._sock._available.call_args_list) == 12 - # Check that no attempt made to read data from the socket. - dns_server._sock.recv.assert_not_called() - - def test_retries_with_bad_data_on_socket(self, wiznet, wrench): - """Confirm correct calls made to socket when bad data available.""" - # Pylint does not understand that the wrench fixture is required. - # pylint: disable=unused-argument - - dns_server = wiz_dns.DNS(wiznet, "8.8.8.8", debug=DEFAULT_DEBUG_ON) - dns_server._sock._available.return_value = 7 - dns_server._sock.recv.return_value = b"\x99\x12\x81\x80\x00\x01\x00" - dns_server.gethostbyname(bytes("domain.name", "utf-8")) - - # Check how many times the socket was polled for data before giving up. - dns_server._sock._available.assert_called() - assert len(dns_server._sock._available.call_args_list) == 5 - # Check how many attempts were made to read data from the socket. - dns_server._sock.recv.assert_called() - assert len(dns_server._sock.recv.call_args_list) == 5 diff --git a/tests/test_parse_dns_function.py b/tests/test_parse_dns_function.py deleted file mode 100644 index 1fdca20..0000000 --- a/tests/test_parse_dns_function.py +++ /dev/null @@ -1,55 +0,0 @@ -# SPDX-FileCopyrightText: 2022 Martin Stephens -# -# SPDX-License-Identifier: MIT - -# pylint: disable=no-self-use, redefined-outer-name, protected-access, invalid-name, too-many-arguments -import pytest -from adafruit_wiznet5k.adafruit_wiznet5k_dns import _parse_dns_response - -SHOW_DEBUG_MESSAGES = False - - -# @pytest.mark.skip(reason="Not currently under development") -@pytest.mark.parametrize( - "request_id, request_length, dns_bytes_recv, ipv4", - ( - ( - 0x3476, # Has CNAME answers - 31, - bytearray( - b"\x34\x76\x81\x80\x00\x01\x00\x04\x00\x00\x00\x00\x03\x77\x77\x77\x05\x61" - b"\x70\x70\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00" - b"\x01\x00\x00\x02\xf3\x00\x1b\x03\x77\x77\x77\x05\x61\x70\x70\x6c\x65\x03" - b"\x63\x6f\x6d\x07\x65\x64\x67\x65\x6b\x65\x79\x03\x6e\x65\x74\x00\xc0\x2b" - b"\x00\x05\x00\x01\x00\x00\x0a\xf2\x00\x2f\x03\x77\x77\x77\x05\x61\x70\x70" - b"\x6c\x65\x03\x63\x6f\x6d\x07\x65\x64\x67\x65\x6b\x65\x79\x03\x6e\x65\x74" - b"\x0b\x67\x6c\x6f\x62\x61\x6c\x72\x65\x64\x69\x72\x06\x61\x6b\x61\x64\x6e" - b"\x73\xc0\x41\xc0\x52\x00\x05\x00\x01\x00\x00\x01\x7d\x00\x18\x05\x65\x36" - b"\x38\x35\x38\x04\x64\x73\x63\x78\x0a\x61\x6b\x61\x6d\x61\x69\x65\x64\x67" - b"\x65\xc0\x41\xc0\x8d\x00\x01\x00\x01\x00\x00\x00\x14\x00\x04\x17\x38\x9c" - b"\x56" - ), - b"\x178\x9cV", - ), - ( - 0x9912, - 36, - bytearray( - b"\x99\x12\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x05\x6c\x65\x61\x72\x6e" - b"\x08\x61\x64\x61\x66\x72\x75\x69\x74\x03\x63\x6f\x6d\x00\x00\x01\x00\x01" - b"\xc0\x0c\x00\x01\x00\x01\x00\x00\x01\x2c\x00\x04\x68\x14\x27\xf0\xc0\x0c" - b"\x00\x01\x00\x01\x00\x00\x01\x2c\x00\x04\x68\x14\x26\xf0" - ), - b"\x68\x14\x27\xf0", - ), - ), -) -def test_parse_response(request_id, request_length, dns_bytes_recv, ipv4): - ip_address = _parse_dns_response( - response=dns_bytes_recv, - query_id=request_id, - query_length=request_length, - debug=SHOW_DEBUG_MESSAGES, - ) - assert ip_address == ipv4 - assert isinstance(ip_address, bytearray)