Skip to content

Commit

Permalink
WIP IO TCPclient
Browse files Browse the repository at this point in the history
Signed-off-by: Francis Bouvier <[email protected]>
  • Loading branch information
francisbouvier committed Jan 30, 2024
1 parent 39f9995 commit 9787aab
Showing 1 changed file with 191 additions and 11 deletions.
202 changes: 191 additions & 11 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ pub const SingleThreaded = struct {
events_nb: *usize,
cbk_error: bool = false,

const Self = @This();
const Loop = @This();

pub fn init(alloc: std.mem.Allocator) !Self {
pub fn init(alloc: std.mem.Allocator) !Loop {
const io = try alloc.create(IO);
io.* = try IO.init(32, 0);
const events_nb = try alloc.create(usize);
events_nb.* = 0;
return Self{ .alloc = alloc, .io = io, .events_nb = events_nb };
return Loop{ .alloc = alloc, .io = io, .events_nb = events_nb };
}

pub fn deinit(self: Self) void {
pub fn deinit(self: Loop) void {
self.io.deinit();
self.alloc.destroy(self.io);
self.alloc.destroy(self.events_nb);
Expand All @@ -48,7 +48,7 @@ pub const SingleThreaded = struct {
// Stops when there is no more I/O events registered on the loop.
// Note that I/O events callbacks might register more I/O events
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self) !void {
pub fn run(self: *Loop) !void {
while (self.eventsNb() > 0) {
try self.io.tick();
// at each iteration we might have new events registred by previous callbacks
Expand All @@ -60,30 +60,210 @@ pub const SingleThreaded = struct {

// Register events atomically
// - add 1 event and return previous value
fn addEvent(self: *Self) usize {
fn addEvent(self: *Loop) usize {
return @atomicRmw(usize, self.events_nb, .Add, 1, .AcqRel);
}
// - remove 1 event and return previous value
fn removeEvent(self: *Self) usize {
fn removeEvent(self: *Loop) usize {
return @atomicRmw(usize, self.events_nb, .Sub, 1, .AcqRel);
}
// - get the number of current events
fn eventsNb(self: *Self) usize {
fn eventsNb(self: *Loop) usize {
return @atomicLoad(usize, self.events_nb, .SeqCst);
}

fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void {
fn freeCbk(self: *Loop, completion: *IO.Completion, ctx: anytype) void {
self.alloc.destroy(completion);
self.alloc.destroy(ctx);
}

// Callback-based APIs
// -------------------

// Network

pub fn Impl(comptime Ctx: type) type {

// TODO: check interfaces of Ctx:
// - loop, socket fields
// - connected, sent, reveived methods

return struct {
const Self = @This();

pub fn connect(
ctx: *Ctx,
address: std.net.Address,
) !void {

// TODO: free completion
const completion = try ctx.loop.alloc.create(IO.Completion);
completion.* = undefined;

const old_events_nb = ctx.loop.addEvent();
ctx.loop.io.connect(
*Ctx,
ctx,
Self.connect_callback,
completion,
ctx.socket,
address,
);

if (builtin.is_test) {
report("start connect {d} on {any} at {any}", .{
old_events_nb + 1,
ctx.socket,
address,
});
}
}

fn connect_callback(
ctx: *Ctx,
completion: *IO.Completion,
result: IO.ConnectError!void,
) void {

// TODO: return the error to the callback
result catch |err| @panic(@errorName(err));

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("connect done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.connected(Self, completion);
}

fn send(
ctx: *Ctx,
completion: *IO.Completion,
buf: []const u8,
) void {
const old_events_nb = ctx.loop.addEvent();
ctx.loop.io.send(
*Ctx,
ctx,
Self.send_callback,
completion,
ctx.socket,
buf,
);

if (builtin.is_test) {
report("start send {d} on {any} with buf len {d}", .{
old_events_nb + 1,
ctx.socket,
buf.len,
});
}
}

fn send_callback(
ctx: *Ctx,
completion: *IO.Completion,
result: IO.SendError!usize,
) void {

// TODO: return the error to the callback
const sent_nb = result catch |err| @panic(@errorName(err));

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("send done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.sent(Self, completion, sent_nb);
}
};
}

pub const TCPClient = struct {
alloc: std.mem.Allocator,
loop: *Loop,
socket: std.os.socket_t,
buf: []const u8,

pub fn init(alloc: std.mem.Allocator, loop: *Loop) !TCPClient {
const msg: []const u8 = "OK\n";
return .{
.alloc = alloc,
.loop = loop,
.socket = undefined,
.buf = try std.mem.Allocator.dupe(alloc, u8, msg),
};
}

pub fn start(
self: *TCPClient,
comptime impl: anytype,
host: []const u8,
port: u16,
) !void {
const addr = try std.net.Address.parseIp4(host, port);
self.socket = try self.loop.io.open_socket(
addr.any.family,
std.os.SOCK.STREAM,
std.os.IPPROTO.TCP,
);
try impl.connect(self, addr);
}

fn connected(
self: *TCPClient,
comptime impl: anytype,
completion: *IO.Completion,
) void {
impl.send(self, completion, self.buf);
}

fn sent(
self: *TCPClient,
comptime _: anytype,
completion: *IO.Completion,
nb: usize,
) void {

// TODO: handle nb == ctx.buf.len
// ie: all data has not been sent
std.debug.print("sent {d} bytes\n", .{nb});

// TODO: IO receive
self.deinit(completion);
}

fn received(
self: *TCPClient,
comptime _: anytype,
completion: *IO.Completion,
nb: usize,
) void {
_ = completion;

std.debug.print("recv {d} bytes\n", .{nb});
if (nb > 0) {
const d = self.buf.?[0..nb];
std.debug.print("recv data: {s}\n", .{d});
}
// TODO: handle nb == ctx.buf.len
// ie: EOF
}

fn deinit(
self: *TCPClient,
completion: *IO.Completion,
) void {
defer self.loop.freeCbk(completion, self);
std.os.closeSocket(self.socket);
self.alloc.free(self.buf);
}
};

// Timeout

const ContextTimeout = struct {
loop: *Self,
loop: *Loop,
js_cbk: ?JSCallback,
};

Expand Down Expand Up @@ -111,7 +291,7 @@ pub const SingleThreaded = struct {
}
}

pub fn timeout(self: *Self, nanoseconds: u63, js_cbk: ?JSCallback) void {
pub fn timeout(self: *Loop, nanoseconds: u63, js_cbk: ?JSCallback) void {
const completion = self.alloc.create(IO.Completion) catch unreachable;
completion.* = undefined;
const ctx = self.alloc.create(ContextTimeout) catch unreachable;
Expand Down

0 comments on commit 9787aab

Please sign in to comment.