Skip to content

Commit

Permalink
clean up and split up io_uring package
Browse files Browse the repository at this point in the history
  • Loading branch information
laytan committed Jan 15, 2025
1 parent bfc7c31 commit 1210c0c
Show file tree
Hide file tree
Showing 9 changed files with 1,022 additions and 1,033 deletions.
77 changes: 77 additions & 0 deletions core/nbio/io_uring/doc.odin
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Wrapper/convenience package over the raw io_uring syscalls, providing help with setup, creation, and operating the ring.
The following example shows a simple `cat` program implementation using the package.
Example:
package main
import "base:runtime"
import "core:fmt"
import "core:os"
import "core:sys/linux"
import uring "core:nbio/io_uring"
main :: proc() {
if len(os.args) < 2 {
fmt.eprintfln("Usage: %s [file name] <[file name] ...>", os.args[0])
os.exit(1)
}
buffers := make([][]byte, len(os.args)-1)
defer delete(buffers)
ring, err := uring.make(&{})
fmt.assertf(err == nil, "uring.make: %v", err)
defer uring.destroy(&ring)
for _, i in os.args[1:] {
submit_read_request(runtime.args__[i], &buffers[i], &ring)
get_completion_and_print(&ring)
}
}
submit_read_request :: proc(path: cstring, buffer: ^[]byte, ring: ^uring.IO_Uring) {
fd, err := linux.open(path, {})
fmt.assertf(err == nil, "open(%q): %v", path, err)
file_sz := get_file_size(fd)
buf := make([]byte, file_sz)
buffer^ = buf
_, ok := uring.read(ring, u64(uintptr(buffer)), fd, buf, 0)
assert(ok, "could not get read sqe")
_, err = uring.submit(ring)
fmt.assertf(err == nil, "uring.submit: %v", err)
}
get_completion_and_print :: proc(ring: ^uring.IO_Uring) {
cqes: [1]linux.IO_Uring_CQE
n, err := uring.copy_cqes(ring, cqes[:], 1)
fmt.assertf(err == nil, "copy_cqes: %v", err)
assert(n == 1)
cqe := cqes[0]
fmt.assertf(cqe.res >= 0, "read failed: %v", linux.Errno(-cqe.res))
buffer := (^[]byte)(uintptr(cqe.user_data))
fmt.println(string(buffer^))
delete(buffer^)
}
get_file_size :: proc(fd: linux.Fd) -> uint {
st: linux.Stat
err := linux.fstat(fd, &st)
fmt.assertf(err == nil, "fstat: %v", err)
if linux.S_ISREG(st.mode) {
return uint(st.size)
}
panic("not a regular file")
}
*/
package io_uring
223 changes: 223 additions & 0 deletions core/nbio/io_uring/ops.odin
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package io_uring

import "core:sys/linux"

// Queues (but does not submit) an SQE to perform an `fsync(2)`.
// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
fsync :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, flags: linux.IO_Uring_Fsync_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .FSYNC
sqe.fsync_flags = flags
sqe.fd = fd
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a no-op.
// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
// A no-op is more useful than may appear at first glance.
// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
// know when the ring is idle before acting on a kill signal.
nop :: proc(ring: ^IO_Uring, user_data: u64) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .NOP
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a `read(2)`.
read :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, buf: []u8, offset: u64) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
assert(len(buf) < int(max(u32)))

sqe = get_sqe(ring) or_return
sqe.opcode = .READ
sqe.fd = fd
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.off = offset
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a `write(2)`.
write :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, buf: []u8, offset: u64) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
assert(len(buf) < int(max(u32)))

sqe = get_sqe(ring) or_return
sqe.opcode = .WRITE
sqe.fd = fd
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.off = offset
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
accept :: proc(ring: ^IO_Uring, user_data: u64, sockfd: linux.Fd, addr: ^$T, flags: linux.Socket_FD_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool)
where T == linux.Sock_Addr_In || T == linux.Sock_Addr_In6 || T == linux.Sock_Addr_Un || T == linux.Sock_Addr_Any {

addr_len := i32(size_of(T))

sqe = get_sqe(ring) or_return
sqe.opcode = .ACCEPT
sqe.fd = sockfd
sqe.addr = cast(u64)uintptr(addr)
sqe.off = cast(u64)uintptr(&addr_len)
sqe.accept_flags = flags
sqe.user_data = user_data
return
}

// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
connect :: proc(ring: ^IO_Uring, user_data: u64, sockfd: linux.Fd, addr: ^$T) -> (sqe: ^linux.IO_Uring_SQE, ok: bool)
where T == linux.Sock_Addr_In || T == linux.Sock_Addr_In6 || T == linux.Sock_Addr_Un || T == linux.Sock_Addr_Any {

sqe = get_sqe(ring) or_return
sqe.opcode = .CONNECT
sqe.fd = sockfd
sqe.addr = cast(u64)uintptr(addr)
sqe.off = size_of(T)
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a `recv(2)`.
recv :: proc(ring: ^IO_Uring, user_data: u64, sockfd: linux.Fd, buf: []byte, flags: linux.Socket_Msg) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
assert(len(buf) < int(max(u32)))

sqe = get_sqe(ring) or_return
sqe.opcode = .RECV
sqe.fd = sockfd
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = cast(u32)uintptr(len(buf))
sqe.msg_flags = flags
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a `send(2)`.
send :: proc(ring: ^IO_Uring, user_data: u64, sockfd: linux.Fd, buf: []byte, flags: linux.Socket_Msg) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
assert(len(buf) < int(max(u32)))

sqe = get_sqe(ring) or_return
sqe.opcode = .SEND
sqe.fd = sockfd
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.msg_flags = flags
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform an `openat(2)`.
openat :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, path: cstring, mode: u32, flags: linux.Open_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .OPENAT
sqe.fd = fd
sqe.addr = cast(u64)transmute(uintptr)path
sqe.len = mode
sqe.open_flags = flags
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to perform a `close(2)`.
close :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .CLOSE
sqe.fd = fd
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to register a timeout operation.
// Returns a pointer to the SQE.
//
// The timeout will complete when either the timeout expires, or after the specified number of
// events complete (if `count` is greater than `0`).
//
// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
//
// The completion event result will be `-ETIME` if the timeout completed through expiration,
// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
// timeout was removed before it expired.
//
// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
timeout :: proc(ring: ^IO_Uring, user_data: u64, ts: ^linux.Time_Spec, count: u32, flags: linux.IO_Uring_Timeout_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .TIMEOUT
sqe.fd = -1
sqe.addr = cast(u64)uintptr(ts)
sqe.len = 1
sqe.off = u64(count)
sqe.timeout_flags = flags
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to remove an existing timeout operation.
// Returns a pointer to the SQE.
//
// The timeout is identified by it's `user_data`.
//
// The completion event result will be `0` if the timeout was found and cancelled successfully,
// `-EBUSY` if the timeout was found but expiration was already in progress, or
// `-ENOENT` if the timeout was not found.
timeout_remove :: proc(ring: ^IO_Uring, user_data: u64, timeout_user_data: u64, flags: linux.IO_Uring_Timeout_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .TIMEOUT_REMOVE
sqe.fd = -1
sqe.addr = timeout_user_data
sqe.timeout_flags = flags
sqe.user_data = user_data
return
}

// Queues (but does not submit) an SQE to add a link timeout operation.
// Returns a pointer to the SQE.
//
// You need to set linux.IOSQE_IO_LINK to flags of the target operation
// and then call this method right after the target operation.
// See https://lwn.net/Articles/803932/ for detail.
//
// If the dependent request finishes before the linked timeout, the timeout
// is canceled. If the timeout finishes before the dependent request, the
// dependent request will be canceled.
//
// The completion event result of the link_timeout will be
// `-ETIME` if the timeout finishes before the dependent request
// (in this case, the completion event result of the dependent request will
// be `-ECANCELED`), or
// `-EALREADY` if the dependent request finishes before the linked timeout.
link_timeout :: proc(ring: ^IO_Uring, user_data: u64, ts: ^linux.Time_Spec, flags: linux.IO_Uring_Timeout_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .LINK_TIMEOUT
sqe.fd = -1
sqe.addr = cast(u64)uintptr(ts)
sqe.len = 1
sqe.timeout_flags = flags
sqe.user_data = user_data
return
}

poll_add :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, events: linux.Fd_Poll_Events, flags: linux.IO_Uring_Poll_Add_Flags) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .POLL_ADD
sqe.fd = fd
sqe.poll_events = events
sqe.poll_flags = flags
sqe.user_data = user_data
return
}

poll_remove :: proc(ring: ^IO_Uring, user_data: u64, fd: linux.Fd, events: linux.Fd_Poll_Events) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
sqe = get_sqe(ring) or_return
sqe.opcode = .POLL_REMOVE
sqe.fd = fd
sqe.poll_events = events
sqe.user_data = user_data
return
}

// TODO: other ops.
Loading

0 comments on commit 1210c0c

Please sign in to comment.