Skip to content

Commit

Permalink
add cancel_one
Browse files Browse the repository at this point in the history
  • Loading branch information
krichprollsch committed Feb 11, 2025
1 parent e87df11 commit ea3cf63
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 4 deletions.
24 changes: 21 additions & 3 deletions io/darwin.zig
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,32 @@ pub const IO = struct {
}
}

pub fn cancel(_: *IO, _: *Completion) void {
pub const CancelOneError = error{ NotFound, ExpirationInProgress } || posix.UnexpectedError;

pub fn cancel_one(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: CancelOneError!void,
) void,
completion: *Completion,
cancel_completion: *Completion,
) void {
_ = self;
_ = context;
_ = callback;
_ = completion;
_ = cancel_completion;
// TODO implement cancellation w/ kqueue.
log.debug("cancel implementation is missing on macOS", .{});
log.debug("cancel_one implementation is missing on macOS", .{});
}

pub fn cancel_all(_: *IO) void {
// TODO Cancel in-flight async IO and wait for all completions.
log.debug("cancel all implementation is missing on macOS", .{});
log.debug("cancel_all implementation is missing on macOS", .{});
}

pub const AcceptError = posix.AcceptError || posix.SetSockOptError;
Expand Down
60 changes: 59 additions & 1 deletion io/linux.zig
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub const IO = struct {
assert(self.ios_in_kernel == 0);
}

pub fn cancel(self: *IO, target: *Completion) void {
fn cancel(self: *IO, target: *Completion) void {
self.cancel_completion = .{
.io = self,
.context = self,
Expand Down Expand Up @@ -376,6 +376,42 @@ pub const IO = struct {
};
}

pub const CancelOneError = error{ NotFound, ExpirationInProgress } || posix.UnexpectedError;

pub fn cancel_one(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: CancelOneError!void,
) void,
completion: *Completion,
cancel_one_completion: *Completion,
) void {
completion.* = .{
.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
callback(
@as(Context, @ptrFromInt(@intFromPtr(ctx))),
comp,
@as(*const CancelOneError!void, @ptrFromInt(@intFromPtr(res))).*,
);
}
}.wrapper,
.operation = .{
.cancel_one = .{
.c = @intFromPtr(cancel_one_completion),
},
},
};

self.enqueue(completion);
}

/// This struct holds the data needed for a single io_uring operation
pub const Completion = struct {
io: *IO,
Expand All @@ -395,6 +431,9 @@ pub const IO = struct {

fn prep(completion: *Completion, sqe: *io_uring_sqe) void {
switch (completion.operation) {
.cancel_one => |op| {
sqe.prep_cancel(op.c, 0);
},
.cancel => |op| {
sqe.prep_cancel(@intFromPtr(op.target), 0);
},
Expand Down Expand Up @@ -465,6 +504,22 @@ pub const IO = struct {

fn complete(completion: *Completion) void {
switch (completion.operation) {
.cancel_one => {
const result: CancelOneError!void = blk: {
if (completion.result < 0) {
const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.SUCCESS => {},
.NOENT => error.NotFound,
.ALREADY => error.ExpirationInProgress,
else => |errno| posix.unexpectedErrno(errno),
};
break :blk err;
} else {
break :blk;
}
};
completion.callback(completion.context, completion, &result);
},
.cancel => {
const result: CancelError!void = result: {
if (completion.result < 0) {
Expand Down Expand Up @@ -797,6 +852,9 @@ pub const IO = struct {

/// This union encodes the set of operations supported as well as their arguments.
const Operation = union(enum) {
cancel_one: struct {
c: u64,
},
cancel: struct {
target: *Completion,
},
Expand Down
59 changes: 59 additions & 0 deletions io/test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,62 @@ test "cancel_all" {
}
}.run_test();
}

test "cancel_one" {
try struct {
const Context = @This();

io: IO,
timeout_res: IO.TimeoutError!void = undefined,
timeout_done: bool = false,
cancel_done: bool = false,

fn run_test() !void {
var self: Context = .{
.io = try IO.init(32, 0),
};
defer self.io.deinit();

var completion: IO.Completion = undefined;
self.io.timeout(
*Context,
&self,
timeout_callback,
&completion,
100 * std.time.ns_per_ms,
);

var cancel_one_completion: IO.Completion = undefined;
self.io.cancel_one(
*Context,
&self,
cancel_one_callback,
&cancel_one_completion,
&completion,
);
while (!self.cancel_done and !self.timeout_done) try self.io.tick();

try testing.expectEqual(true, self.timeout_done);
try testing.expectEqual(true, self.cancel_done);
try testing.expectError(IO.TimeoutError.Canceled, self.timeout_res);
}

fn timeout_callback(
self: *Context,
_: *IO.Completion,
result: IO.TimeoutError!void,
) void {
self.timeout_res = result;
self.timeout_done = true;
}

fn cancel_one_callback(
self: *Context,
_: *IO.Completion,
result: IO.CancelOneError!void,
) void {
result catch |err| std.debug.panic("cancel one error: {}", .{err});
self.cancel_done = true;
}
}.run_test();
}

0 comments on commit ea3cf63

Please sign in to comment.