Skip to content

Commit

Permalink
TCP server: Fixed socket.accept() stopping the application for igno…
Browse files Browse the repository at this point in the history
…rable errors (#367)
francis-clairicia authored Oct 19, 2024
1 parent 67c2d7d commit 51bc68a
Showing 9 changed files with 94 additions and 41 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -2,6 +2,12 @@ name: Test

on:
workflow_dispatch:
inputs:
verbose:
description: 'Increase level of test verbosity'
required: false
default: false
type: boolean
pull_request:
types:
- opened
@@ -56,8 +62,10 @@ jobs:
tox_py: py312
- python_version: '3.13'
tox_py: py313
env:
PYTEST_VERBOSE_FLAG: ${{ inputs.verbose && '-v' || '' }}

name: Tests (${{ matrix.os }}, ${{ matrix.python_version }})
name: test (${{ matrix.os }}, ${{ matrix.python_version }})
steps:
- uses: actions/checkout@v4
with:
@@ -68,7 +76,7 @@ jobs:
python-version: ${{ matrix.python_version }}
- name: Launch tests
timeout-minutes: 20
run: tox run -f ${{ matrix.tox_py }} -- -v
run: tox run -f ${{ matrix.tox_py }} -- ${{ env.PYTEST_VERBOSE_FLAG }}
- name: Generate coverage report
if: hashFiles('.coverage.*') != '' # Rudimentary `file.exists()`
continue-on-error: true
Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ async def client_connection_task(client_socket: _socket.socket, task_group: Task
# The remote host closed the connection before starting the task.
# See this test for details:
# test____serve_forever____accept_client____client_sent_RST_packet_right_after_accept
logger.warning("A client connection was interrupted just after listener.accept()")
pass
else:
self.__accepted_socket_factory.log_connection_error(logger, exc)

@@ -181,7 +181,7 @@ async def raw_accept(self) -> _socket.socket:
raise _utils.error_from_errno(_errno.EBADF) from None
finally:
self.__accept_scope = None
else:
elif exc.errno not in constants.IGNORABLE_ACCEPT_ERRNOS:
raise

def backend(self) -> AsyncBackend:
34 changes: 32 additions & 2 deletions src/easynetwork/lowlevel/constants.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
"ACCEPT_CAPACITY_ERROR_SLEEP_TIME",
"DEFAULT_SERIALIZER_LIMIT",
"DEFAULT_STREAM_BUFSIZE",
"IGNORABLE_ACCEPT_ERRNOS",
"MAX_DATAGRAM_BUFSIZE",
"NOT_CONNECTED_SOCKET_ERRNOS",
"SC_IOV_MAX",
@@ -57,8 +58,7 @@
}
)

# Errors that accept(2) can return, and which indicate that the system is
# overloaded
# Errors that accept(2) can return, and which indicate that the system is overloaded
ACCEPT_CAPACITY_ERRNOS: Final[frozenset[int]] = frozenset(
{
_errno.EMFILE,
@@ -71,6 +71,36 @@
# How long to sleep when we get one of those errors
ACCEPT_CAPACITY_ERROR_SLEEP_TIME: Final[float] = 0.100

# Taken from Trio project
# Errors that accept(2) can return, and can be skipped
IGNORABLE_ACCEPT_ERRNOS: frozenset[int] = frozenset(
{
errno
for name in (
# Linux can do this when the a connection is denied by the firewall
"EPERM",
# BSDs with an early close/reset
"ECONNABORTED",
# All the other miscellany noted above -- may not happen in practice, but
# whatever.
"EPROTO",
"ENETDOWN",
"ENOPROTOOPT",
"EHOSTDOWN",
"ENONET",
"EHOSTUNREACH",
"EOPNOTSUPP",
"ENETUNREACH",
"ENOSR",
"ESOCKTNOSUPPORT",
"EPROTONOSUPPORT",
"ETIMEDOUT",
"ECONNRESET",
)
if (errno := getattr(_errno, name, None)) is not None
}
)

# Number of seconds to wait for SSL handshake to complete
# The default timeout matches that of Nginx.
SSL_HANDSHAKE_TIMEOUT: Final[float] = 60.0
10 changes: 1 addition & 9 deletions src/easynetwork/servers/async_tcp.py
Original file line number Diff line number Diff line change
@@ -284,7 +284,6 @@ async def __client_initializer(

client_address = lowlevel_client.extra(INETSocketAttribute.peername, None)
if client_address is None:
self.__client_closed_before_starting_task(self.logger)
yield None
return

@@ -358,17 +357,10 @@ def __client_tls_handshake_error_handler(cls, logger: logging.Logger, exc: Excep
or _utils.is_ssl_eof_error(exc)
or exc.errno in constants.NOT_CONNECTED_SOCKET_ERRNOS
):
cls.__client_closed_before_starting_task(logger)
pass
case _: # pragma: no cover
logger.warning("Error in client task (during TLS handshake)", exc_info=exc)

@staticmethod
def __client_closed_before_starting_task(logger: logging.Logger) -> None:
# The remote host closed the connection before starting the task.
# See this test for details:
# test____serve_forever____accept_client____client_sent_RST_packet_right_after_accept
logger.warning("A client connection was interrupted just after listener.accept()")

@_utils.inherit_doc(_base.BaseAsyncNetworkServerImpl)
def get_addresses(self) -> Sequence[SocketAddress]:
return self._with_lowlevel_servers(
Original file line number Diff line number Diff line change
@@ -87,9 +87,7 @@ async def test____send_packet____default(self, client: AsyncUDPNetworkClient[str
async with asyncio.timeout(3):
assert await server.recvfrom() == (b"ABCDEF", client.get_local_address())

# Windows and MacOS do not raise error
@PlatformMarkers.skipif_platform_win32
@PlatformMarkers.skipif_platform_macOS
@PlatformMarkers.runs_only_on_platform("linux", "Windows and MacOS do not raise error")
async def test____send_packet____connection_refused(
self,
client: AsyncUDPNetworkClient[str, str],
@@ -99,9 +97,7 @@ async def test____send_packet____connection_refused(
with pytest.raises(ConnectionRefusedError):
await client.send_packet("ABCDEF")

# Windows and MacOS do not raise error
@PlatformMarkers.skipif_platform_win32
@PlatformMarkers.skipif_platform_macOS
@PlatformMarkers.runs_only_on_platform("linux", "Windows and MacOS do not raise error")
async def test____send_packet____connection_refused____after_previous_successful_try(
self,
client: AsyncUDPNetworkClient[str, str],
Original file line number Diff line number Diff line change
@@ -624,7 +624,6 @@ async def test____serve_forever____accept_client____client_sent_RST_packet_right
from socket import socket as SocketType

caplog.set_level(logging.WARNING, LOGGER.name)
logger_crash_maximum_nb_lines[LOGGER.name] = 1

socket = SocketType()

@@ -638,10 +637,8 @@ async def test____serve_forever____accept_client____client_sent_RST_packet_right
# and will fail at client initialization when calling socket.getpeername() (errno.ENOTCONN will be raised)
await asyncio.sleep(0.1)

# ENOTCONN error should not create a big Traceback error but only a warning (at least)
assert len(caplog.records) == 1
assert caplog.records[0].levelno == logging.WARNING
assert caplog.records[0].message == "A client connection was interrupted just after listener.accept()"
# On Linux: ENOTCONN error should not create a big Traceback error
assert len(caplog.records) == 0

async def test____serve_forever____client_extra_attributes(
self,
Original file line number Diff line number Diff line change
@@ -58,17 +58,13 @@ def test____send_packet____default(self, client: UDPNetworkClient[str, str], ser
client.send_packet("ABCDEF")
assert server.recvfrom(1024) == (b"ABCDEF", client.get_local_address())

# Windows and MacOS do not raise error
@PlatformMarkers.skipif_platform_win32
@PlatformMarkers.skipif_platform_macOS
@PlatformMarkers.runs_only_on_platform("linux", "Windows and MacOS do not raise error")
def test____send_packet____connection_refused(self, client: UDPNetworkClient[str, str], server: Socket) -> None:
server.close()
with pytest.raises(ConnectionRefusedError):
client.send_packet("ABCDEF")

# Windows and MacOS do not raise error
@PlatformMarkers.skipif_platform_win32
@PlatformMarkers.skipif_platform_macOS
@PlatformMarkers.runs_only_on_platform("linux", "Windows and MacOS do not raise error")
def test____send_packet____connection_refused____after_previous_successful_try(
self,
client: UDPNetworkClient[str, str],
14 changes: 13 additions & 1 deletion tests/tools.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
_T_Args = TypeVarTuple("_T_Args")


def _make_skipif_platform(platform: str, reason: str, *, skip_only_on_ci: bool) -> pytest.MarkDecorator:
def _make_skipif_platform(platform: str | tuple[str, ...], reason: str, *, skip_only_on_ci: bool) -> pytest.MarkDecorator:
condition: bool = sys.platform.startswith(platform)
if skip_only_on_ci:
# skip if 'CI' is set to a non-empty value
@@ -32,8 +32,14 @@ def _make_skipif_platform(platform: str, reason: str, *, skip_only_on_ci: bool)
return pytest.mark.skipif(condition, reason=reason)


def _make_skipif_not_on_platform(platform: str | tuple[str, ...], reason: str) -> pytest.MarkDecorator:
return pytest.mark.skipif(not sys.platform.startswith(platform), reason=reason)


@final
class PlatformMarkers:
###### SKIP SOME PLATFORMS ######

@staticmethod
def skipif_platform_win32_because(reason: str, *, skip_only_on_ci: bool = False) -> pytest.MarkDecorator:
return _make_skipif_platform("win32", reason, skip_only_on_ci=skip_only_on_ci)
@@ -50,6 +56,12 @@ def skipif_platform_linux_because(reason: str, *, skip_only_on_ci: bool = False)
skipif_platform_macOS = skipif_platform_macOS_because("cannot run on MacOS")
skipif_platform_linux = skipif_platform_linux_because("cannot run on Linux")

###### RESTRICT TESTS FOR PLATFORMS ######

@staticmethod
def runs_only_on_platform(platform: str | tuple[str, ...], reason: str) -> pytest.MarkDecorator:
return _make_skipif_not_on_platform(platform, reason)


def send_return(gen: Generator[Any, _T_contra, _V_co], value: _T_contra, /) -> _V_co:
with pytest.raises(StopIteration) as exc_info:
38 changes: 30 additions & 8 deletions tests/unit_test/test_async/test_asyncio_backend/test_stream.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
StreamReaderBufferedProtocol,
)
from easynetwork.lowlevel.api_async.backend._asyncio.tasks import CancelScope, TaskGroup as AsyncIOTaskGroup
from easynetwork.lowlevel.constants import ACCEPT_CAPACITY_ERRNOS, NOT_CONNECTED_SOCKET_ERRNOS
from easynetwork.lowlevel.constants import ACCEPT_CAPACITY_ERRNOS, IGNORABLE_ACCEPT_ERRNOS, NOT_CONNECTED_SOCKET_ERRNOS
from easynetwork.lowlevel.socket import SocketAttribute

import pytest
@@ -345,11 +345,9 @@ async def test____serve____connect____error_raised(
assert len(caplog.records) == 0
accepted_socket_factory.log_connection_error.assert_not_called()
case OSError(errno=errno) if errno in NOT_CONNECTED_SOCKET_ERRNOS:
# ENOTCONN error should not create a big Traceback error but only a warning (at least)
assert len(caplog.records) == 1
assert caplog.records[0].levelno == logging.WARNING
assert caplog.records[0].message == "A client connection was interrupted just after listener.accept()"
assert caplog.records[0].exc_info is None
# ENOTCONN error should not create a big Traceback error
assert len(caplog.records) == 0
accepted_socket_factory.log_connection_error.assert_not_called()
case _:
assert len(caplog.records) == 0
accepted_socket_factory.log_connection_error.assert_called_once_with(
@@ -368,7 +366,7 @@ async def test____accept____accept_capacity_error(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
caplog.set_level(logging.ERROR)
caplog.set_level(logging.WARNING)
mock_tcp_listener_socket.accept.side_effect = OSError(errno_value, os.strerror(errno_value))

# Act
@@ -380,21 +378,45 @@ async def test____accept____accept_capacity_error(
# Assert
assert len(caplog.records) in {9, 10}
for record in caplog.records:
assert record.levelno == logging.ERROR
assert "retrying" in record.message
assert (
record.exc_info is not None
and isinstance(record.exc_info[1], OSError)
and record.exc_info[1].errno == errno_value
)

@pytest.mark.parametrize("errno_value", sorted(IGNORABLE_ACCEPT_ERRNOS), ids=errno_errorcode.__getitem__)
async def test____accept____ignorable_error(
self,
errno_value: int,
listener: ListenerSocketAdapter[Any],
mock_tcp_listener_socket: MagicMock,
mock_tcp_socket: MagicMock,
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
caplog.set_level(logging.WARNING)
mock_tcp_listener_socket.accept.side_effect = [
OSError(errno_value, os.strerror(errno_value)),
(mock_tcp_socket, ("127.0.0.1", 12345)),
]

# Act
socket = await listener.raw_accept()

# Assert
assert socket is mock_tcp_socket
assert len(caplog.records) == 0

async def test____accept____reraise_other_OSErrors(
self,
listener: ListenerSocketAdapter[Any],
mock_tcp_listener_socket: MagicMock,
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
caplog.set_level(logging.ERROR)
caplog.set_level(logging.WARNING)
exc = OSError()
mock_tcp_listener_socket.accept.side_effect = exc

0 comments on commit 51bc68a

Please sign in to comment.