diff --git a/src/easynetwork/lowlevel/_utils.py b/src/easynetwork/lowlevel/_utils.py index 49b2eed5..51ef3d1a 100644 --- a/src/easynetwork/lowlevel/_utils.py +++ b/src/easynetwork/lowlevel/_utils.py @@ -286,6 +286,67 @@ def set_reuseport(sock: SupportsSocketOptions) -> None: raise ValueError("reuse_port not supported by socket module, SO_REUSEPORT defined but not implemented.") from None +def open_listener_sockets_from_getaddrinfo_result( + infos: Iterable[tuple[int, int, int, str, tuple[Any, ...]]], + *, + backlog: int | None, + reuse_address: bool, + reuse_port: bool, +) -> list[_socket.socket]: + sockets: list[_socket.socket] = [] + reuse_address = reuse_address and hasattr(_socket, "SO_REUSEADDR") + with contextlib.ExitStack() as _whole_context_stack: + errors: list[OSError] = [] + _whole_context_stack.callback(errors.clear) + + socket_exit_stack = _whole_context_stack.enter_context(contextlib.ExitStack()) + + for af, socktype, proto, _, sa in infos: + try: + sock = socket_exit_stack.enter_context(contextlib.closing(_socket.socket(af, socktype, proto))) + except OSError: + # Assume it's a bad family/type/protocol combination. + continue + sockets.append(sock) + if reuse_address: + try: + sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, True) + except OSError: + # Will fail later on bind() + pass + if reuse_port: + set_reuseport(sock) + # Disable IPv4/IPv6 dual stack support (enabled by + # default on Linux) which makes a single socket + # listen on both address families. + if af == _socket.AF_INET6: + if hasattr(_socket, "IPPROTO_IPV6"): + sock.setsockopt(_socket.IPPROTO_IPV6, _socket.IPV6_V6ONLY, True) + if "%" in sa[0]: + addr, scope_id = sa[0].split("%", 1) + sa = (addr, sa[1], 0, int(scope_id)) + try: + sock.bind(sa) + except OSError as exc: + errors.append( + OSError( + exc.errno, f"error while attempting to bind to address {sa!r}: {exc.strerror.lower()}" + ).with_traceback(exc.__traceback__) + ) + continue + if backlog is not None: + sock.listen(backlog) + + if errors: + # No need to call errors.clear(), this is done by exit stack + raise ExceptionGroup("Error when trying to create listeners", errors) + + # There were no errors, therefore do not close the sockets + socket_exit_stack.pop_all() + + return sockets + + def exception_with_notes(exc: _T_Exception, notes: str | Iterable[str]) -> _T_Exception: if isinstance(notes, str): notes = (notes,) diff --git a/src/easynetwork/lowlevel/api_async/backend/_asyncio/_asyncio_utils.py b/src/easynetwork/lowlevel/api_async/backend/_asyncio/_asyncio_utils.py index 43bf645e..47a4d8b5 100644 --- a/src/easynetwork/lowlevel/api_async/backend/_asyncio/_asyncio_utils.py +++ b/src/easynetwork/lowlevel/api_async/backend/_asyncio/_asyncio_utils.py @@ -22,23 +22,19 @@ "create_connection", "create_datagram_connection", "ensure_resolved", - "open_listener_sockets_from_getaddrinfo_result", "resolve_local_addresses", "wait_until_readable", "wait_until_writable", ] import asyncio -import contextlib import itertools import math import socket as _socket from collections import OrderedDict -from collections.abc import Iterable, Sequence +from collections.abc import Sequence from typing import Any, cast -from .... import _utils - async def ensure_resolved( host: str | None, @@ -285,67 +281,6 @@ async def create_datagram_connection( ) -def open_listener_sockets_from_getaddrinfo_result( - infos: Iterable[tuple[int, int, int, str, tuple[Any, ...]]], - *, - backlog: int | None, - reuse_address: bool, - reuse_port: bool, -) -> list[_socket.socket]: - sockets: list[_socket.socket] = [] - reuse_address = reuse_address and hasattr(_socket, "SO_REUSEADDR") - with contextlib.ExitStack() as _whole_context_stack: - errors: list[OSError] = [] - _whole_context_stack.callback(errors.clear) - - socket_exit_stack = _whole_context_stack.enter_context(contextlib.ExitStack()) - - for af, socktype, proto, _, sa in infos: - try: - sock = socket_exit_stack.enter_context(contextlib.closing(_socket.socket(af, socktype, proto))) - except OSError: - # Assume it's a bad family/type/protocol combination. - continue - sockets.append(sock) - if reuse_address: - try: - sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, True) - except OSError: - # Will fail later on bind() - pass - if reuse_port: - _utils.set_reuseport(sock) - # Disable IPv4/IPv6 dual stack support (enabled by - # default on Linux) which makes a single socket - # listen on both address families. - if af == _socket.AF_INET6: - if hasattr(_socket, "IPPROTO_IPV6"): - sock.setsockopt(_socket.IPPROTO_IPV6, _socket.IPV6_V6ONLY, True) - if "%" in sa[0]: - addr, scope_id = sa[0].split("%", 1) - sa = (addr, sa[1], 0, int(scope_id)) - try: - sock.bind(sa) - except OSError as exc: - errors.append( - OSError( - exc.errno, f"error while attempting to bind to address {sa!r}: {exc.strerror.lower()}" - ).with_traceback(exc.__traceback__) - ) - continue - if backlog is not None: - sock.listen(backlog) - - if errors: - # No need to call errors.clear(), this is done by exit stack - raise ExceptionGroup("Error when trying to create listeners", errors) - - # There were no errors, therefore do not close the sockets - socket_exit_stack.pop_all() - - return sockets - - def wait_until_readable(sock: _socket.socket, loop: asyncio.AbstractEventLoop) -> asyncio.Future[None]: def on_fut_done(f: asyncio.Future[None]) -> None: loop.remove_reader(sock) diff --git a/src/easynetwork/lowlevel/api_async/backend/_asyncio/backend.py b/src/easynetwork/lowlevel/api_async/backend/_asyncio/backend.py index 7c47cd69..e27e8fd3 100644 --- a/src/easynetwork/lowlevel/api_async/backend/_asyncio/backend.py +++ b/src/easynetwork/lowlevel/api_async/backend/_asyncio/backend.py @@ -152,7 +152,7 @@ async def create_tcp_listeners( if not isinstance(backlog, int): raise TypeError("backlog: Expected an integer") - from ._asyncio_utils import open_listener_sockets_from_getaddrinfo_result, resolve_local_addresses + from ._asyncio_utils import resolve_local_addresses from .stream.listener import AcceptedSocketFactory, ListenerSocketAdapter reuse_address: bool = os.name not in ("nt", "cygwin") and sys.platform != "cygwin" @@ -172,7 +172,7 @@ async def create_tcp_listeners( _socket.SOCK_STREAM, ) - sockets: list[_socket.socket] = open_listener_sockets_from_getaddrinfo_result( + sockets: list[_socket.socket] = _utils.open_listener_sockets_from_getaddrinfo_result( infos, backlog=backlog, reuse_address=reuse_address, @@ -216,7 +216,7 @@ async def create_udp_listeners( *, reuse_port: bool = False, ) -> Sequence[AsyncDatagramListener[tuple[Any, ...]]]: - from ._asyncio_utils import open_listener_sockets_from_getaddrinfo_result, resolve_local_addresses + from ._asyncio_utils import resolve_local_addresses from .datagram.listener import DatagramListenerProtocol, DatagramListenerSocketAdapter loop = self.__asyncio.get_running_loop() @@ -237,7 +237,7 @@ async def create_udp_listeners( _socket.SOCK_DGRAM, ) - sockets: list[_socket.socket] = open_listener_sockets_from_getaddrinfo_result( + sockets: list[_socket.socket] = _utils.open_listener_sockets_from_getaddrinfo_result( infos, backlog=None, reuse_address=False, diff --git a/tests/unit_test/test_async/test_asyncio_backend/test_backend.py b/tests/unit_test/test_async/test_asyncio_backend/test_backend.py index b38a7688..bb58344b 100644 --- a/tests/unit_test/test_async/test_asyncio_backend/test_backend.py +++ b/tests/unit_test/test_async/test_asyncio_backend/test_backend.py @@ -318,7 +318,7 @@ async def test____create_tcp_listeners____open_listener_sockets( return_value=addrinfo_list, ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_tcp_socket], ) mock_ListenerSocketAdapter: MagicMock = mocker.patch( @@ -387,7 +387,7 @@ async def test____create_tcp_listeners____bind_to_any_interfaces( return_value=addrinfo_list, ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_tcp_socket, mock_tcp_socket], ) mock_ListenerSocketAdapter: MagicMock = mocker.patch( @@ -457,7 +457,7 @@ async def test____create_tcp_listeners____bind_to_several_hosts( side_effect=[[info] for info in addrinfo_list], ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_tcp_socket, mock_tcp_socket], ) mock_ListenerSocketAdapter: MagicMock = mocker.patch( @@ -514,7 +514,7 @@ async def test____create_tcp_listeners____error_getaddrinfo_returns_empty_list( return_value=[], ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", side_effect=AssertionError, ) mock_ListenerSocketAdapter: MagicMock = mocker.patch( @@ -559,7 +559,7 @@ async def test____create_tcp_listeners____invalid_backlog( return_value=[], ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", side_effect=AssertionError, ) mock_ListenerSocketAdapter: MagicMock = mocker.patch( @@ -682,7 +682,7 @@ async def test____create_udp_listeners____open_listener_sockets( return_value=addrinfo_list, ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_udp_socket], ) mock_create_datagram_endpoint: AsyncMock = mocker.patch.object( @@ -761,7 +761,7 @@ async def test____create_udp_listeners____bind_to_local_interfaces( return_value=addrinfo_list, ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_udp_socket, mock_udp_socket], ) mock_create_datagram_endpoint: AsyncMock = mocker.patch.object( @@ -844,7 +844,7 @@ async def test____create_udp_listeners____bind_to_several_hosts( return_value=addrinfo_list, ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", return_value=[mock_udp_socket, mock_udp_socket], ) mock_create_datagram_endpoint: AsyncMock = mocker.patch.object( @@ -911,7 +911,7 @@ async def test____create_udp_listeners____error_getaddrinfo_returns_empty_list( return_value=[], ) mock_open_listeners = mocker.patch( - f"{_ASYNCIO_BACKEND_MODULE}._asyncio_utils.open_listener_sockets_from_getaddrinfo_result", + "easynetwork.lowlevel._utils.open_listener_sockets_from_getaddrinfo_result", side_effect=AssertionError, ) mock_create_datagram_endpoint: AsyncMock = mocker.patch.object( diff --git a/tests/unit_test/test_async/test_asyncio_backend/test_utils.py b/tests/unit_test/test_async/test_asyncio_backend/test_utils.py index 84e51c66..7ad388b8 100644 --- a/tests/unit_test/test_async/test_asyncio_backend/test_utils.py +++ b/tests/unit_test/test_async/test_asyncio_backend/test_utils.py @@ -13,25 +13,20 @@ AI_PASSIVE, EAI_BADFLAGS, EAI_NONAME, - IPPROTO_IPV6, IPPROTO_TCP, IPPROTO_UDP, - IPV6_V6ONLY, - SO_REUSEADDR, SOCK_DGRAM, SOCK_STREAM, - SOL_SOCKET, SocketType, gaierror, ) -from typing import TYPE_CHECKING, Any, Literal, Protocol as TypingProtocol, assert_never, cast +from typing import TYPE_CHECKING, Any, Literal, Protocol as TypingProtocol, assert_never from easynetwork.lowlevel._utils import error_from_errno from easynetwork.lowlevel.api_async.backend._asyncio._asyncio_utils import ( create_connection, create_datagram_connection, ensure_resolved, - open_listener_sockets_from_getaddrinfo_result, wait_until_readable, wait_until_writable, ) @@ -757,143 +752,6 @@ async def connect_side_effect(sock: SocketType, address: tuple[Any, ...]) -> Non ] -@pytest.fixture -def addrinfo_list() -> Sequence[tuple[int, int, int, str, tuple[Any, ...]]]: - return ( - (AF_INET, SOCK_STREAM, IPPROTO_TCP, "", ("0.0.0.0", 65432)), - (AF_INET6, SOCK_STREAM, IPPROTO_TCP, "", ("::", 65432, 0, 0)), - ) - - -@pytest.mark.parametrize("reuse_address", [False, True], ids=lambda boolean: f"reuse_address=={boolean}") -@pytest.mark.parametrize("SO_REUSEADDR_available", [False, True], ids=lambda boolean: f"SO_REUSEADDR_available=={boolean}") -@pytest.mark.parametrize("SO_REUSEADDR_raise_error", [False, True], ids=lambda boolean: f"SO_REUSEADDR_raise_error=={boolean}") -@pytest.mark.parametrize("IPPROTO_IPV6_available", [False, True], ids=lambda boolean: f"IPPROTO_IPV6_available=={boolean}") -@pytest.mark.parametrize("reuse_port", [False, True], ids=lambda boolean: f"reuse_port=={boolean}") -@pytest.mark.parametrize("backlog", [123456, None], ids=lambda value: f"backlog=={value}") -def test____open_listener_sockets_from_getaddrinfo_result____create_listener_sockets( - reuse_address: bool, - backlog: int | None, - SO_REUSEADDR_available: bool, - SO_REUSEADDR_raise_error: bool, - IPPROTO_IPV6_available: bool, - reuse_port: bool, - mock_socket_cls: MagicMock, - mock_socket_ipv4: MagicMock, - mock_socket_ipv6: MagicMock, - addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], - monkeypatch: pytest.MonkeyPatch, - SO_REUSEPORT: int, - mocker: MockerFixture, -) -> None: - # Arrange - if not SO_REUSEADDR_available: - monkeypatch.delattr("socket.SO_REUSEADDR", raising=True) - if not IPPROTO_IPV6_available: - monkeypatch.delattr("socket.IPPROTO_IPV6", raising=False) - if SO_REUSEADDR_raise_error: - - def setsockopt(level: int, opt: int, value: int, /) -> None: - if level == SOL_SOCKET and opt == SO_REUSEADDR: - raise OSError - - mock_socket_ipv4.setsockopt.side_effect = setsockopt - mock_socket_ipv6.setsockopt.side_effect = setsockopt - - # Act - sockets = cast( - "list[MagicMock]", - open_listener_sockets_from_getaddrinfo_result( - addrinfo_list, - backlog=backlog, - reuse_address=reuse_address, - reuse_port=reuse_port, - ), - ) - - # Assert - assert len(sockets) == len(addrinfo_list) - assert mock_socket_cls.call_args_list == [mocker.call(f, t, p) for f, t, p, _, _ in addrinfo_list] - for socket, (sock_family, _, _, _, sock_addr) in zip(sockets, addrinfo_list, strict=True): - expected_setsockopt_calls: list[Any] = [] - if reuse_address and SO_REUSEADDR_available: - expected_setsockopt_calls.append(mocker.call(SOL_SOCKET, SO_REUSEADDR, True)) - if reuse_port: - expected_setsockopt_calls.append(mocker.call(SOL_SOCKET, SO_REUSEPORT, True)) - if sock_family == AF_INET6 and IPPROTO_IPV6_available: - expected_setsockopt_calls.append(mocker.call(IPPROTO_IPV6, IPV6_V6ONLY, True)) - - assert socket.setsockopt.mock_calls == expected_setsockopt_calls - - socket.bind.assert_called_once_with(sock_addr) - if backlog is None: - socket.listen.assert_not_called() - else: - socket.listen.assert_called_once_with(backlog) - socket.close.assert_not_called() - - -def test____open_listener_sockets_from_getaddrinfo_result____ignore_bad_combinations( - mock_socket_cls: MagicMock, - mock_tcp_socket_factory: Callable[[], MagicMock], - addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], -) -> None: - # Arrange - assert len(addrinfo_list) == 2 # In prevention - mock_socket_cls.side_effect = [mock_tcp_socket_factory(), OSError] - - # Act - sockets = open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) - - # Assert - assert len(sockets) == 1 - - -def test____open_listener_sockets_from_getaddrinfo_result____bind_failed( - mock_socket_cls: MagicMock, - mock_tcp_socket_factory: Callable[[], MagicMock], - addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], -) -> None: - # Arrange - assert len(addrinfo_list) == 2 # In prevention - s1, s2 = mock_tcp_socket_factory(), mock_tcp_socket_factory() - mock_socket_cls.side_effect = [s1, s2] - s2.bind.side_effect = OSError(1234, "error message") - - # Act - with pytest.raises(ExceptionGroup, match=r"^Error when trying to create listeners \(1 sub-exception\)$") as exc_info: - open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) - - # Assert - os_errors, exc = exc_info.value.split(OSError) - assert exc is None - assert os_errors is not None - assert len(os_errors.exceptions) == 1 - assert isinstance(os_errors.exceptions[0], OSError) - assert os_errors.exceptions[0].errno == 1234 - - s1.close.assert_called_once_with() - s2.close.assert_called_once_with() - - -def test____open_listener_sockets_from_getaddrinfo_result____ipv6_scope_id_not_properly_extracted_from_address( - mock_socket_cls: MagicMock, - mock_socket_ipv6: MagicMock, -) -> None: - # Arrange - addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]] = [ - (AF_INET6, SOCK_STREAM, IPPROTO_TCP, "", ("4e76:f928:6bbc:53ce:c01e:00d5:cdd5:6bbb%6", 65432, 0, 0)), - ] - mock_socket_cls.side_effect = [mock_socket_ipv6] - - # Act - sockets = open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) - - # Assert - assert sockets == [mock_socket_ipv6] - mock_socket_ipv6.bind.assert_called_once_with(("4e76:f928:6bbc:53ce:c01e:00d5:cdd5:6bbb", 65432, 0, 6)) - - @pytest.mark.asyncio @pytest.mark.parametrize( ["waiter", "event_loop_add_event_func_name", "event_loop_remove_event_func_name"], diff --git a/tests/unit_test/test_tools/test_utils__open_listener_sockets.py b/tests/unit_test/test_tools/test_utils__open_listener_sockets.py new file mode 100644 index 00000000..352ebaea --- /dev/null +++ b/tests/unit_test/test_tools/test_utils__open_listener_sockets.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import errno +from collections.abc import Callable, Sequence +from socket import AF_INET, AF_INET6, IPPROTO_IPV6, IPPROTO_TCP, IPV6_V6ONLY, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET +from typing import TYPE_CHECKING, Any, cast + +from easynetwork.lowlevel._utils import error_from_errno, open_listener_sockets_from_getaddrinfo_result + +import pytest + +if TYPE_CHECKING: + from unittest.mock import MagicMock + + from pytest_mock import MockerFixture + + +@pytest.fixture +def mock_socket_ipv4(mock_socket_factory: Callable[[], MagicMock]) -> MagicMock: + return mock_socket_factory() + + +@pytest.fixture +def mock_socket_ipv6(mock_socket_factory: Callable[[], MagicMock]) -> MagicMock: + return mock_socket_factory() + + +@pytest.fixture(autouse=True) +def mock_socket_cls(mock_socket_ipv4: MagicMock, mock_socket_ipv6: MagicMock, mocker: MockerFixture) -> MagicMock: + def side_effect(family: int, type: int, proto: int) -> MagicMock: + if family == AF_INET6: + used_socket = mock_socket_ipv6 + elif family == AF_INET: + used_socket = mock_socket_ipv4 + else: + raise error_from_errno(errno.EAFNOSUPPORT) + + used_socket.family = family + used_socket.type = type + used_socket.proto = proto + return used_socket + + return mocker.patch("socket.socket", side_effect=side_effect) + + +@pytest.fixture +def addrinfo_list() -> Sequence[tuple[int, int, int, str, tuple[Any, ...]]]: + return ( + (AF_INET, SOCK_STREAM, IPPROTO_TCP, "", ("0.0.0.0", 65432)), + (AF_INET6, SOCK_STREAM, IPPROTO_TCP, "", ("::", 65432, 0, 0)), + ) + + +@pytest.mark.parametrize("reuse_address", [False, True], ids=lambda boolean: f"reuse_address=={boolean}") +@pytest.mark.parametrize("SO_REUSEADDR_available", [False, True], ids=lambda boolean: f"SO_REUSEADDR_available=={boolean}") +@pytest.mark.parametrize("SO_REUSEADDR_raise_error", [False, True], ids=lambda boolean: f"SO_REUSEADDR_raise_error=={boolean}") +@pytest.mark.parametrize("IPPROTO_IPV6_available", [False, True], ids=lambda boolean: f"IPPROTO_IPV6_available=={boolean}") +@pytest.mark.parametrize("reuse_port", [False, True], ids=lambda boolean: f"reuse_port=={boolean}") +@pytest.mark.parametrize("backlog", [123456, None], ids=lambda value: f"backlog=={value}") +def test____open_listener_sockets_from_getaddrinfo_result____create_listener_sockets( + reuse_address: bool, + backlog: int | None, + SO_REUSEADDR_available: bool, + SO_REUSEADDR_raise_error: bool, + IPPROTO_IPV6_available: bool, + reuse_port: bool, + mock_socket_cls: MagicMock, + mock_socket_ipv4: MagicMock, + mock_socket_ipv6: MagicMock, + addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], + monkeypatch: pytest.MonkeyPatch, + SO_REUSEPORT: int, + mocker: MockerFixture, +) -> None: + # Arrange + if not SO_REUSEADDR_available: + monkeypatch.delattr("socket.SO_REUSEADDR", raising=True) + if not IPPROTO_IPV6_available: + monkeypatch.delattr("socket.IPPROTO_IPV6", raising=False) + if SO_REUSEADDR_raise_error: + + def setsockopt(level: int, opt: int, value: int, /) -> None: + if level == SOL_SOCKET and opt == SO_REUSEADDR: + raise OSError + + mock_socket_ipv4.setsockopt.side_effect = setsockopt + mock_socket_ipv6.setsockopt.side_effect = setsockopt + + # Act + sockets = cast( + "list[MagicMock]", + open_listener_sockets_from_getaddrinfo_result( + addrinfo_list, + backlog=backlog, + reuse_address=reuse_address, + reuse_port=reuse_port, + ), + ) + + # Assert + assert len(sockets) == len(addrinfo_list) + assert mock_socket_cls.call_args_list == [mocker.call(f, t, p) for f, t, p, _, _ in addrinfo_list] + for socket, (sock_family, _, _, _, sock_addr) in zip(sockets, addrinfo_list, strict=True): + expected_setsockopt_calls: list[Any] = [] + if reuse_address and SO_REUSEADDR_available: + expected_setsockopt_calls.append(mocker.call(SOL_SOCKET, SO_REUSEADDR, True)) + if reuse_port: + expected_setsockopt_calls.append(mocker.call(SOL_SOCKET, SO_REUSEPORT, True)) + if sock_family == AF_INET6 and IPPROTO_IPV6_available: + expected_setsockopt_calls.append(mocker.call(IPPROTO_IPV6, IPV6_V6ONLY, True)) + + assert socket.setsockopt.mock_calls == expected_setsockopt_calls + + socket.bind.assert_called_once_with(sock_addr) + if backlog is None: + socket.listen.assert_not_called() + else: + socket.listen.assert_called_once_with(backlog) + socket.close.assert_not_called() + + +def test____open_listener_sockets_from_getaddrinfo_result____ignore_bad_combinations( + mock_socket_cls: MagicMock, + mock_tcp_socket_factory: Callable[[], MagicMock], + addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], +) -> None: + # Arrange + assert len(addrinfo_list) == 2 # In prevention + mock_socket_cls.side_effect = [mock_tcp_socket_factory(), OSError] + + # Act + sockets = open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) + + # Assert + assert len(sockets) == 1 + + +def test____open_listener_sockets_from_getaddrinfo_result____bind_failed( + mock_socket_cls: MagicMock, + mock_tcp_socket_factory: Callable[[], MagicMock], + addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]], +) -> None: + # Arrange + assert len(addrinfo_list) == 2 # In prevention + s1, s2 = mock_tcp_socket_factory(), mock_tcp_socket_factory() + mock_socket_cls.side_effect = [s1, s2] + s2.bind.side_effect = OSError(1234, "error message") + + # Act + with pytest.raises(ExceptionGroup, match=r"^Error when trying to create listeners \(1 sub-exception\)$") as exc_info: + open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) + + # Assert + os_errors, exc = exc_info.value.split(OSError) + assert exc is None + assert os_errors is not None + assert len(os_errors.exceptions) == 1 + assert isinstance(os_errors.exceptions[0], OSError) + assert os_errors.exceptions[0].errno == 1234 + + s1.close.assert_called_once_with() + s2.close.assert_called_once_with() + + +def test____open_listener_sockets_from_getaddrinfo_result____ipv6_scope_id_not_properly_extracted_from_address( + mock_socket_cls: MagicMock, + mock_socket_ipv6: MagicMock, +) -> None: + # Arrange + addrinfo_list: Sequence[tuple[int, int, int, str, tuple[Any, ...]]] = [ + (AF_INET6, SOCK_STREAM, IPPROTO_TCP, "", ("4e76:f928:6bbc:53ce:c01e:00d5:cdd5:6bbb%6", 65432, 0, 0)), + ] + mock_socket_cls.side_effect = [mock_socket_ipv6] + + # Act + sockets = open_listener_sockets_from_getaddrinfo_result(addrinfo_list, backlog=10, reuse_address=True, reuse_port=False) + + # Assert + assert sockets == [mock_socket_ipv6] + mock_socket_ipv6.bind.assert_called_once_with(("4e76:f928:6bbc:53ce:c01e:00d5:cdd5:6bbb", 65432, 0, 6))