diff --git a/src/cockpit/transports.py b/src/cockpit/transports.py index faa7aaee3494..8aabc7d83d6b 100644 --- a/src/cockpit/transports.py +++ b/src/cockpit/transports.py @@ -29,14 +29,14 @@ import struct import subprocess import termios -from typing import Any, ClassVar, Deque, Dict, List, Optional, Sequence, Tuple +from typing import Any, ClassVar, Sequence from .jsonutil import JsonObject, get_int libc6 = ctypes.cdll.LoadLibrary('libc.so.6') -def prctl(*args): +def prctl(*args: int) -> None: if libc6.prctl(*args) != 0: raise OSError('prctl() failed') @@ -55,7 +55,7 @@ class _Transport(asyncio.Transport): _loop: asyncio.AbstractEventLoop _protocol: asyncio.Protocol - _queue: Optional[Deque[bytes]] + _queue: 'collections.deque[bytes] | None' _in_fd: int _out_fd: int _closing: bool @@ -67,7 +67,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, protocol: asyncio.Protocol, in_fd: int = -1, out_fd: int = -1, - extra: Optional[Dict[str, object]] = None): + extra: 'dict[str, object] | None' = None): super().__init__(extra) self._loop = loop @@ -138,7 +138,7 @@ def resume_reading(self) -> None: def _close(self) -> None: pass - def abort(self, exc: Optional[Exception] = None) -> None: + def abort(self, exc: 'Exception | None' = None) -> None: self._closing = True self._close_reader() self._remove_write_queue() @@ -162,10 +162,10 @@ def get_write_buffer_size(self) -> int: return 0 return sum(len(block) for block in self._queue) - def get_write_buffer_limits(self) -> Tuple[int, int]: + def get_write_buffer_limits(self) -> 'tuple[int, int]': return (0, 0) - def set_write_buffer_limits(self, high: Optional[int] = None, low: Optional[int] = None) -> None: + def set_write_buffer_limits(self, high: 'int | None' = None, low: 'int | None' = None) -> None: assert high is None or high == 0 assert low is None or low == 0 @@ -305,11 +305,11 @@ class SubprocessTransport(_Transport, asyncio.SubprocessTransport): data from it, making it available via the .get_stderr() method. """ - _returncode: Optional[int] = None + _returncode: 'int | None' = None - _pty_fd: Optional[int] = None - _process: Optional['subprocess.Popen[bytes]'] = None - _stderr: Optional['Spooler'] + _pty_fd: 'int | None' = None + _process: 'subprocess.Popen[bytes] | None' = None + _stderr: 'Spooler | None' @staticmethod def _create_watcher() -> asyncio.AbstractChildWatcher: @@ -363,11 +363,11 @@ def __init__(self, args: Sequence[str], *, pty: bool = False, - window: Optional[WindowSize] = None, + window: 'WindowSize | None' = None, **kwargs: Any): # go down as a team -- we don't want any leaked processes when the bridge terminates - def preexec_fn(): + def preexec_fn() -> None: prctl(SET_PDEATHSIG, signal.SIGTERM) if pty: fcntl.ioctl(0, termios.TIOCSCTTY, 0) @@ -422,7 +422,7 @@ def get_pid(self) -> int: assert self._process is not None return self._process.pid - def get_returncode(self) -> Optional[int]: + def get_returncode(self) -> 'int | None': return self._returncode def get_pipe_transport(self, fd: int) -> asyncio.Transport: @@ -502,7 +502,7 @@ class Spooler: _loop: asyncio.AbstractEventLoop _fd: int - _contents: List[bytes] + _contents: 'list[bytes]' def __init__(self, loop: asyncio.AbstractEventLoop, fd: int): self._loop = loop diff --git a/test/static-code b/test/static-code index f5e7eee985a6..f1eb2bf323be 100755 --- a/test/static-code +++ b/test/static-code @@ -49,6 +49,7 @@ if [ "${WITH_PARTIAL_TREE:-0}" = 0 ]; then src/cockpit/_version.py src/cockpit/jsonutil.py src/cockpit/protocol.py + src/cockpit/transports.py ' test_mypy() { command -v mypy >/dev/null || skip 'no mypy'