Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP IO TCPclient #194

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 191 additions & 11 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ pub const SingleThreaded = struct {
events_nb: *usize,
cbk_error: bool = false,

const Self = @This();
const Loop = @This();

pub fn init(alloc: std.mem.Allocator) !Self {
pub fn init(alloc: std.mem.Allocator) !Loop {
const io = try alloc.create(IO);
io.* = try IO.init(32, 0);
const events_nb = try alloc.create(usize);
events_nb.* = 0;
return Self{ .alloc = alloc, .io = io, .events_nb = events_nb };
return Loop{ .alloc = alloc, .io = io, .events_nb = events_nb };
}

pub fn deinit(self: Self) void {
pub fn deinit(self: Loop) void {
self.io.deinit();
self.alloc.destroy(self.io);
self.alloc.destroy(self.events_nb);
Expand All @@ -48,7 +48,7 @@ pub const SingleThreaded = struct {
// Stops when there is no more I/O events registered on the loop.
// Note that I/O events callbacks might register more I/O events
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self) !void {
pub fn run(self: *Loop) !void {
while (self.eventsNb() > 0) {
try self.io.tick();
// at each iteration we might have new events registred by previous callbacks
Expand All @@ -60,30 +60,210 @@ pub const SingleThreaded = struct {

// Register events atomically
// - add 1 event and return previous value
fn addEvent(self: *Self) usize {
fn addEvent(self: *Loop) usize {
return @atomicRmw(usize, self.events_nb, .Add, 1, .AcqRel);
}
// - remove 1 event and return previous value
fn removeEvent(self: *Self) usize {
fn removeEvent(self: *Loop) usize {
return @atomicRmw(usize, self.events_nb, .Sub, 1, .AcqRel);
}
// - get the number of current events
fn eventsNb(self: *Self) usize {
fn eventsNb(self: *Loop) usize {
return @atomicLoad(usize, self.events_nb, .SeqCst);
}

fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void {
fn freeCbk(self: *Loop, completion: *IO.Completion, ctx: anytype) void {
self.alloc.destroy(completion);
self.alloc.destroy(ctx);
}

// Callback-based APIs
// -------------------

// Network

pub fn Impl(comptime Ctx: type) type {

// TODO: check interfaces of Ctx:
// - loop, socket fields
// - connected, sent, reveived methods

return struct {
const Self = @This();

pub fn connect(
ctx: *Ctx,
address: std.net.Address,
) !void {

// TODO: free completion
const completion = try ctx.loop.alloc.create(IO.Completion);
completion.* = undefined;

const old_events_nb = ctx.loop.addEvent();
ctx.loop.io.connect(
*Ctx,
ctx,
Self.connect_callback,
completion,
ctx.socket,
address,
);

if (builtin.is_test) {
report("start connect {d} on {any} at {any}", .{
old_events_nb + 1,
ctx.socket,
address,
});
}
}

fn connect_callback(
ctx: *Ctx,
completion: *IO.Completion,
result: IO.ConnectError!void,
) void {

// TODO: return the error to the callback
result catch |err| @panic(@errorName(err));

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("connect done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.connected(Self, completion);
}

fn send(
ctx: *Ctx,
completion: *IO.Completion,
buf: []const u8,
) void {
const old_events_nb = ctx.loop.addEvent();
ctx.loop.io.send(
*Ctx,
ctx,
Self.send_callback,
completion,
ctx.socket,
buf,
);

if (builtin.is_test) {
report("start send {d} on {any} with buf len {d}", .{
old_events_nb + 1,
ctx.socket,
buf.len,
});
}
}

fn send_callback(
ctx: *Ctx,
completion: *IO.Completion,
result: IO.SendError!usize,
) void {

// TODO: return the error to the callback
const sent_nb = result catch |err| @panic(@errorName(err));

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("send done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.sent(Self, completion, sent_nb);
}
};
}

pub const TCPClient = struct {
alloc: std.mem.Allocator,
loop: *Loop,
socket: std.os.socket_t,
buf: []const u8,

pub fn init(alloc: std.mem.Allocator, loop: *Loop) !TCPClient {
const msg: []const u8 = "OK\n";
return .{
.alloc = alloc,
.loop = loop,
.socket = undefined,
.buf = try std.mem.Allocator.dupe(alloc, u8, msg),
};
}

pub fn start(
self: *TCPClient,
comptime impl: anytype,
host: []const u8,
port: u16,
) !void {
const addr = try std.net.Address.parseIp4(host, port);
self.socket = try self.loop.io.open_socket(
addr.any.family,
std.os.SOCK.STREAM,
std.os.IPPROTO.TCP,
);
try impl.connect(self, addr);
}

fn connected(
self: *TCPClient,
comptime impl: anytype,
completion: *IO.Completion,
) void {
impl.send(self, completion, self.buf);
}

fn sent(
self: *TCPClient,
comptime _: anytype,
completion: *IO.Completion,
nb: usize,
) void {

// TODO: handle nb == ctx.buf.len
// ie: all data has not been sent
std.debug.print("sent {d} bytes\n", .{nb});

// TODO: IO receive
self.deinit(completion);
}

fn received(
self: *TCPClient,
comptime _: anytype,
completion: *IO.Completion,
nb: usize,
) void {
_ = completion;

std.debug.print("recv {d} bytes\n", .{nb});
if (nb > 0) {
const d = self.buf.?[0..nb];
std.debug.print("recv data: {s}\n", .{d});
}
// TODO: handle nb == ctx.buf.len
// ie: EOF
}

fn deinit(
self: *TCPClient,
completion: *IO.Completion,
) void {
defer self.loop.freeCbk(completion, self);
std.os.closeSocket(self.socket);
self.alloc.free(self.buf);
}
};

// Timeout

const ContextTimeout = struct {
loop: *Self,
loop: *Loop,
js_cbk: ?JSCallback,
};

Expand Down Expand Up @@ -111,7 +291,7 @@ pub const SingleThreaded = struct {
}
}

pub fn timeout(self: *Self, nanoseconds: u63, js_cbk: ?JSCallback) void {
pub fn timeout(self: *Loop, nanoseconds: u63, js_cbk: ?JSCallback) void {
const completion = self.alloc.create(IO.Completion) catch unreachable;
completion.* = undefined;
const ctx = self.alloc.create(ContextTimeout) catch unreachable;
Expand Down
Loading