Skip to content

Commit

Permalink
Add EventLoop#wait_readable, #wait_writable methods (#15376)
Browse files Browse the repository at this point in the history
These methods in the `Crystal::EventLoop` interfaces wait on file descriptor or socket readiness without performing an actual read or write operation, which can be delegated to an external library.

This provides a semi-public interface to work around #15374:

```crystal
file_descriptor = IO::FileDescriptor.new(LibC.some_fd, blocking: false)

event_loop = Crystal::EventLoop.current
event_loop.wait_readable(file_descriptor)
LibC.do_something(fd)
```

This is implemented for the polling event loops (epoll, kqueue) as well as libevent since these were straightforward.

Windows is left unimplemented. It might be implemented with `WSAPoll` running in a thread or `ProcessSocketNotifications` to associate sockets to a completion port. See [Winsock socket state notifications](https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications) for more details.


Related to [RFC #7](https://github.com/crystal-lang/rfcs/blob/main/text/0007-event_loop-refactor.md) and [RFC #9](https://github.com/crystal-lang/rfcs/blob/main/text/0009-lifetime-event_loop.md).
  • Loading branch information
ysbaddaden authored Feb 6, 2025
1 parent 89aa014 commit 8823b75
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 18 deletions.
6 changes: 6 additions & 0 deletions src/crystal/event_loop/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ abstract class Crystal::EventLoop
# Returns 0 when EOF is reached.
abstract def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for read.
abstract def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil

# Writes at least one byte from *slice* to the file descriptor.
#
# Blocks the current fiber if the file descriptor isn't ready for writing,
Expand All @@ -17,6 +20,9 @@ abstract class Crystal::EventLoop
# Returns the number of bytes written (up to `slice.size`).
abstract def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for write.
abstract def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil

# Closes the file descriptor resource.
abstract def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
end
Expand Down
22 changes: 22 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,21 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end.to_i32
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(FileDescriptor)")
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end.to_i32
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(FileDescriptor)")
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
LibC.CancelIoEx(file_descriptor.windows_handle, nil) unless file_descriptor.system_blocking?
end
Expand All @@ -220,6 +228,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes_read.to_i32
end

def wait_readable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(Socket)")
end

def write(socket : ::Socket, slice : Bytes) : Int32
wsabuf = wsa_buffer(slice)

Expand All @@ -231,6 +246,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes.to_i32
end

def wait_writable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(Socket)")
end

def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32
wsabuf = wsa_buffer(slice)
bytes_written = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped|
Expand Down
36 changes: 32 additions & 4 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -94,6 +100,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -104,12 +116,24 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0).to_i32
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
Expand Down Expand Up @@ -142,7 +166,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
socket.wait_writable(timeout: timeout) do
socket.evented_wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
Expand Down Expand Up @@ -174,7 +198,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
if socket.closed?
return
elsif Errno.value == Errno::EAGAIN
socket.wait_readable(raise_if_closed: false) do
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
return if socket.closed?
Expand All @@ -200,7 +224,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_readable
target.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand All @@ -218,7 +244,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_writable
target.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand Down
24 changes: 24 additions & 0 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : System::FileDescriptor) : Nil
wait_readable(file_descriptor, file_descriptor.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32
size = evented_write(file_descriptor, slice, file_descriptor.@write_timeout)

Expand All @@ -160,6 +166,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : System::FileDescriptor) : Nil
wait_writable(file_descriptor, file_descriptor.@write_timeout) do
raise IO::TimeoutError.new
end
end

def close(file_descriptor : System::FileDescriptor) : Nil
evented_close(file_descriptor)
end
Expand All @@ -176,12 +188,24 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
size
end

def wait_readable(socket : ::Socket) : Nil
wait_readable(socket, socket.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
size = evented_write(socket, slice, socket.@write_timeout)
raise IO::Error.from_errno("write", target: socket) if size == -1
size
end

def wait_writable(socket : ::Socket) : Nil
wait_writable(socket, socket.@write_timeout) do
raise IO::TimeoutError.new
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
loop do
client_fd =
Expand Down
6 changes: 6 additions & 0 deletions src/crystal/event_loop/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ abstract class Crystal::EventLoop
# Use `#receive_from` for capturing the source address of a message.
abstract def read(socket : ::Socket, slice : Bytes) : Int32

# Blocks the current fiber until the socket is ready for read.
abstract def wait_readable(socket : ::Socket) : Nil

# Writes at least one byte from *slice* to the socket.
#
# Blocks the current fiber if the socket is not ready for writing,
Expand All @@ -25,6 +28,9 @@ abstract class Crystal::EventLoop
# Use `#send_to` for sending a message to a specific target address.
abstract def write(socket : ::Socket, slice : Bytes) : Int32

# Blocks the current fiber until the socket is ready for write.
abstract def wait_writable(socket : ::Socket) : Nil

# Accepts an incoming TCP connection on the socket.
#
# Blocks the current fiber if no connection is waiting, continuing when one
Expand Down
32 changes: 30 additions & 2 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -49,6 +55,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable(raise_if_closed: false) do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -59,12 +71,24 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0)
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from"
end
Expand Down Expand Up @@ -94,7 +118,9 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_readable
target.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand All @@ -112,7 +138,9 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_writable
target.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand Down
14 changes: 2 additions & 12 deletions src/io/evented.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ module IO::Evented
end

# :nodoc:
def wait_readable(timeout = @read_timeout) : Nil
wait_readable(timeout: timeout) { raise TimeoutError.new("Read timed out") }
end

# :nodoc:
def wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
def evented_wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
readers = @readers.get { Deque(Fiber).new }
readers << Fiber.current
add_read_event(timeout)
Expand All @@ -59,12 +54,7 @@ module IO::Evented
end

# :nodoc:
def wait_writable(timeout = @write_timeout) : Nil
wait_writable(timeout: timeout) { raise TimeoutError.new("Write timed out") }
end

# :nodoc:
def wait_writable(timeout = @write_timeout, &) : Nil
def evented_wait_writable(timeout = @write_timeout, &) : Nil
writers = @writers.get { Deque(Fiber).new }
writers << Fiber.current
add_write_event(timeout)
Expand Down

0 comments on commit 8823b75

Please sign in to comment.