Skip to content

Commit

Permalink
feat(tls) add duplex upgrade (#13718)
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari authored Sep 6, 2024
1 parent d38fc90 commit 36c5f84
Show file tree
Hide file tree
Showing 8 changed files with 1,385 additions and 372 deletions.
5 changes: 5 additions & 0 deletions src/bun.js/api/Timer.zig
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,15 @@ pub const EventLoopTimer = struct {
TimerObject,
TestRunner,
StatWatcherScheduler,
UpgradedDuplex,

pub fn Type(comptime T: Tag) type {
return switch (T) {
.TimerCallback => TimerCallback,
.TimerObject => TimerObject,
.TestRunner => JSC.Jest.TestRunner,
.StatWatcherScheduler => StatWatcherScheduler,
.UpgradedDuplex => uws.UpgradedDuplex,
};
}
};
Expand Down Expand Up @@ -796,6 +798,9 @@ pub const EventLoopTimer = struct {
if (comptime t.Type() == StatWatcherScheduler) {
return container.timerCallback();
}
if (comptime t.Type() == uws.UpgradedDuplex) {
return container.onTimeout();
}

if (comptime t.Type() == JSC.Jest.TestRunner) {
container.onTestTimeout(now, vm);
Expand Down
264 changes: 263 additions & 1 deletion src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ pub const Listener = struct {
if (socket.ext(**anyopaque)) |ctx| {
ctx.* = bun.cast(**anyopaque, this_socket);
}
socket.setTimeout(120000);
socket.setTimeout(120);
}

pub fn addServerName(this: *Listener, global: *JSC.JSGlobalObject, hostname: JSValue, tls: JSValue) JSValue {
Expand Down Expand Up @@ -1221,6 +1221,20 @@ fn NewSocket(comptime ssl: bool) type {
return null;
}

pub fn handleError(this: *This, err_value: JSC.JSValue) void {
log("handleError", .{});
const handlers = this.handlers;
var vm = handlers.vm;
if (vm.isShuttingDown()) {
return;
}
vm.eventLoop().enter();
defer vm.eventLoop().exit();
const globalObject = handlers.globalObject;
const this_value = this.getThisValue(globalObject);
_ = handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, err_value });
}

pub fn onWritable(
this: *This,
_: Socket,
Expand Down Expand Up @@ -1726,6 +1740,7 @@ fn NewSocket(comptime ssl: bool) type {
globalObject.throw("Timeout must be a positive integer", .{});
return .zero;
}
log("timeout({d})", .{t});

this.socket.setTimeout(@as(c_uint, @intCast(t)));

Expand Down Expand Up @@ -3173,6 +3188,135 @@ pub fn NewWrappedHandler(comptime tls: bool) type {
}
};
}

pub const DuplexUpgradeContext = struct {
upgrade: uws.UpgradedDuplex,
// We only us a tls and not a raw socket when upgrading a Duplex, Duplex dont support socketpairs
tls: ?*TLSSocket,
// task used to deinit the context in the next tick, vm is used to enqueue the task
vm: *JSC.VirtualMachine,
task: JSC.AnyTask,
task_event: EventState = .StartTLS,
ssl_config: ?JSC.API.ServerConfig.SSLConfig,
pub const EventState = enum(u8) {
StartTLS,
Close,
};

usingnamespace bun.New(DuplexUpgradeContext);

fn onOpen(this: *DuplexUpgradeContext) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onOpen(socket);
}
}

fn onData(this: *DuplexUpgradeContext, decoded_data: []const u8) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onData(socket, decoded_data);
}
}

fn onHandshake(this: *DuplexUpgradeContext, success: bool, ssl_error: uws.us_bun_verify_error_t) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onHandshake(socket, @intFromBool(success), ssl_error);
}
}

fn onEnd(this: *DuplexUpgradeContext) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);
if (this.tls) |tls| {
tls.onEnd(socket);
}
}

fn onWritable(this: *DuplexUpgradeContext) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onWritable(socket);
}
}

fn onError(this: *DuplexUpgradeContext, err_value: JSC.JSValue) void {
if (this.tls) |tls| {
tls.handleError(err_value);
}
}

fn onTimeout(this: *DuplexUpgradeContext) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onTimeout(socket);
}
}

fn onClose(this: *DuplexUpgradeContext) void {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

if (this.tls) |tls| {
tls.onClose(socket, 0, null);
}

this.deinitInNextTick();
}

fn runEvent(this: *DuplexUpgradeContext) void {
switch (this.task_event) {
.StartTLS => {
if (this.ssl_config) |config| {
this.upgrade.startTLS(config, true) catch |err| {
switch (err) {
error.OutOfMemory => {
bun.outOfMemory();
},
else => {
const errno = @intFromEnum(bun.C.SystemErrno.ECONNREFUSED);
if (this.tls) |tls| {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);

tls.handleConnectError(errno);
tls.onClose(socket, errno, null);
}
},
}
};
this.ssl_config.?.deinit();
this.ssl_config = null;
}
},
.Close => {
this.upgrade.close();
},
}
}

fn deinitInNextTick(this: *DuplexUpgradeContext) void {
this.task_event = .Close;
this.vm.enqueueTask(JSC.Task.init(&this.task));
}

fn startTLS(this: *DuplexUpgradeContext) void {
this.task_event = .StartTLS;
this.vm.enqueueTask(JSC.Task.init(&this.task));
}

fn deinit(this: *DuplexUpgradeContext) void {
if (this.tls) |tls| {
this.tls = null;
tls.deref();
}
this.upgrade.deinit();
this.destroy();
}
};
pub fn jsAddServerName(global: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSValue {
JSC.markBinding(@src());

Expand All @@ -3189,8 +3333,126 @@ pub fn jsAddServerName(global: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) J
return .zero;
}

pub fn jsUpgradeDuplexToTLS(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSValue {
JSC.markBinding(@src());

const args = callframe.arguments(2);
if (args.len < 2) {
globalObject.throw("Expected 2 arguments", .{});
return .zero;
}
const duplex = args.ptr[0];
// TODO: do better type checking
if (duplex.isEmptyOrUndefinedOrNull()) {
globalObject.throw("Expected a Duplex instance", .{});
return .zero;
}

var exception: JSC.C.JSValueRef = null;

const opts = args.ptr[1];
if (opts.isEmptyOrUndefinedOrNull() or opts.isBoolean() or !opts.isObject()) {
globalObject.throw("Expected options object", .{});
return .zero;
}

const socket_obj = opts.get(globalObject, "socket") orelse {
globalObject.throw("Expected \"socket\" option", .{});
return .zero;
};

var handlers = Handlers.fromJS(globalObject, socket_obj, &exception) orelse {
globalObject.throwValue(exception.?.value());
return .zero;
};

var ssl_opts: ?JSC.API.ServerConfig.SSLConfig = null;
if (opts.getTruthy(globalObject, "tls")) |tls| {
if (tls.isBoolean()) {
if (tls.toBoolean()) {
ssl_opts = JSC.API.ServerConfig.SSLConfig.zero;
}
} else {
if (JSC.API.ServerConfig.SSLConfig.inJS(JSC.VirtualMachine.get(), globalObject, tls, &exception)) |ssl_config| {
ssl_opts = ssl_config;
} else if (exception != null) {
return .zero;
}
}
}
if (ssl_opts == null) {
globalObject.throw("Expected \"tls\" option", .{});
return .zero;
}

var default_data = JSValue.zero;
if (opts.fastGet(globalObject, .data)) |default_data_value| {
default_data = default_data_value;
default_data.ensureStillAlive();
}

const socket_config = ssl_opts.?;

const protos = socket_config.protos;
const protos_len = socket_config.protos_len;

const is_server = false; // A duplex socket is always handled as a client

var handlers_ptr = handlers.vm.allocator.create(Handlers) catch bun.outOfMemory();
handlers_ptr.* = handlers;
handlers_ptr.is_server = is_server;
handlers_ptr.protect();
var tls = TLSSocket.new(.{
.handlers = handlers_ptr,
.this_value = .zero,
.socket = TLSSocket.Socket.detached,
.connection = null,
.wrapped = .tls,
.protos = if (protos) |p| (bun.default_allocator.dupe(u8, p[0..protos_len]) catch bun.outOfMemory()) else null,
.server_name = if (socket_config.server_name) |server_name| (bun.default_allocator.dupe(u8, server_name[0..bun.len(server_name)]) catch bun.outOfMemory()) else null,
.socket_context = null, // only set after the wrapTLS
});
const tls_js_value = tls.getThisValue(globalObject);
TLSSocket.dataSetCached(tls_js_value, globalObject, default_data);

var duplexContext = DuplexUpgradeContext.new(.{
.upgrade = undefined,
.tls = tls,
.vm = globalObject.bunVM(),
.task = undefined,
.ssl_config = socket_config,
});
tls.ref();

duplexContext.task = JSC.AnyTask.New(DuplexUpgradeContext, DuplexUpgradeContext.runEvent).init(duplexContext);
duplexContext.upgrade = uws.UpgradedDuplex.from(globalObject, duplex, .{
.onOpen = @ptrCast(&DuplexUpgradeContext.onOpen),
.onData = @ptrCast(&DuplexUpgradeContext.onData),
.onHandshake = @ptrCast(&DuplexUpgradeContext.onHandshake),
.onClose = @ptrCast(&DuplexUpgradeContext.onClose),
.onEnd = @ptrCast(&DuplexUpgradeContext.onEnd),
.onWritable = @ptrCast(&DuplexUpgradeContext.onWritable),
.onError = @ptrCast(&DuplexUpgradeContext.onError),
.onTimeout = @ptrCast(&DuplexUpgradeContext.onTimeout),
.ctx = @ptrCast(duplexContext),
});

tls.socket = TLSSocket.Socket.fromDuplex(&duplexContext.upgrade);
tls.markActive();
tls.poll_ref.ref(globalObject.bunVM());

duplexContext.startTLS();

const array = JSC.JSValue.createEmptyArray(globalObject, 2);
array.putIndex(globalObject, 0, tls_js_value);
// data, end, drain and close events must be reported
array.putIndex(globalObject, 1, duplexContext.upgrade.getJSHandlers(globalObject));

return array;
}
pub fn createNodeTLSBinding(global: *JSC.JSGlobalObject) JSC.JSValue {
return JSC.JSArray.create(global, &.{
JSC.JSFunction.create(global, "addServerName", JSC.toJSHostFunction(jsAddServerName), 3, .{}),
JSC.JSFunction.create(global, "upgradeDuplexToTLS", JSC.toJSHostFunction(jsUpgradeDuplexToTLS), 2, .{}),
});
}
10 changes: 5 additions & 5 deletions src/bun.js/api/bun/ssl_wrapper.zig
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub fn SSLWrapper(comptime T: type) type {
// We cannot shutdown read in SSL, the read direction is closed by the peer.
// So we just ignore the onData data, we still wanna to wait until we received the shutdown
const DummyReadHandler = struct {
fn onData(_: T, _: *This, _: []const u8) void {}
fn onData(_: T, _: []const u8) void {}
};
this.handlers.onData = DummyReadHandler.onData;
}
Expand All @@ -133,7 +133,7 @@ pub fn SSLWrapper(comptime T: type) type {
// The peer might continue sending data for some period of time before handling the local application's shutdown indication.
// This will start a full shutdown process if fast_shutdown = false, we can assume that the other side will complete the 2-step shutdown ASAP.
const ret = BoringSSL.SSL_shutdown(ssl);
if (fast_shutdown and ret == 0) {
if (fast_shutdown) {
// This allows for a more rapid shutdown process if the application does not wish to wait for the peer.
// This alternative "fast shutdown" approach should only be done if it is known that the peer will not send more data, otherwise there is a risk of an application exposing itself to a truncation attack.
// The full SSL_shutdown() process, in which both parties send close_notify alerts and SSL_shutdown() returns 1, provides a cryptographically authenticated indication of the end of a connection.
Expand Down Expand Up @@ -176,19 +176,19 @@ pub fn SSLWrapper(comptime T: type) type {
}

// Return if we have pending data to be read or write
pub fn hasPendingData(this: *This) bool {
pub fn hasPendingData(this: *const This) bool {
const ssl = this.ssl orelse return false;

return BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_wbio(ssl)) > 0 or BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_rbio(ssl)) > 0;
}

// We sent or received a shutdown (closing or closed)
pub fn isShutdown(this: *This) bool {
pub fn isShutdown(this: *const This) bool {
return this.flags.closed_notified or this.flags.received_ssl_shutdown or this.flags.sent_ssl_shutdown;
}

// We sent and received the shutdown (fully closed)
pub fn isClosed(this: *This) bool {
pub fn isClosed(this: *const This) bool {
return this.flags.received_ssl_shutdown and this.flags.sent_ssl_shutdown;
}

Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ export default [
// Bun-specific
["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"],
["ERR_BODY_ALREADY_USED", Error, "Error"],
["ERR_STREAM_WRAP", Error, "Error"],
] as ErrorCodeMapping;
Loading

0 comments on commit 36c5f84

Please sign in to comment.