Skip to content

Commit

Permalink
Issue altdesktop#153: Aio write_callback to handle EAGAIN
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuri Chernyavsky authored and Yuri Chernyavsky committed Jun 27, 2023
1 parent ab566e1 commit eb9a2d1
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions dbus_next/aio/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import socket
from copy import copy
from typing import Optional
import errno


def _future_set_exception(fut, exc):
Expand Down Expand Up @@ -43,35 +44,41 @@ def __init__(self, bus):
def write_callback(self):
try:
while True:
if self.buf is None:
if self.messages.qsize() == 0:
# nothing more to write
self.loop.remove_writer(self.fd)
try:
if self.buf is None:
if self.messages.qsize() == 0:
# nothing more to write
self.loop.remove_writer(self.fd)
return
buf, unix_fds, fut = self.messages.get_nowait()
self.unix_fds = unix_fds
self.buf = memoryview(buf)
self.offset = 0
self.fut = fut

if self.unix_fds and self.negotiate_unix_fd:
ancdata = [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
array.array("i", self.unix_fds))]
self.offset += self.sock.sendmsg([self.buf[self.offset:]], ancdata)
self.unix_fds = None
else:
self.offset += self.sock.send(self.buf[self.offset:])

if self.offset >= len(self.buf):
# finished writing
self.buf = None
_future_set_result(self.fut, None)
else:
# wait for writable
return
buf, unix_fds, fut = self.messages.get_nowait()
self.unix_fds = unix_fds
self.buf = memoryview(buf)
self.offset = 0
self.fut = fut

if self.unix_fds and self.negotiate_unix_fd:
ancdata = [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
array.array("i", self.unix_fds))]
self.offset += self.sock.sendmsg([self.buf[self.offset:]], ancdata)
self.unix_fds = None
else:
self.offset += self.sock.send(self.buf[self.offset:])
except OSError as e:
if e.errno == errno.EAGAIN:
return
raise

if self.offset >= len(self.buf):
# finished writing
self.buf = None
_future_set_result(self.fut, None)
else:
# wait for writable
return
except Exception as e:
_future_set_exception(self.fut, e)
self.bus._finalize(e)
except Exception as e:
_future_set_exception(self.fut, e)
self.bus._finalize(e)

def buffer_message(self, msg: Message, future=None):
self.messages.put_nowait(
Expand Down

0 comments on commit eb9a2d1

Please sign in to comment.