Skip to content

Commit

Permalink
Support ReadableStream in request.clone & response.clone() (#13744)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Sep 6, 2024
1 parent cd7f6a1 commit d38fc90
Show file tree
Hide file tree
Showing 13 changed files with 1,312 additions and 379 deletions.
9 changes: 7 additions & 2 deletions src/bun.js/bindings/BunObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr
}

size_t arrayLength = array->length();
if (arrayLength < 1) {
const auto returnEmptyArrayBufferView = [&]() -> EncodedJSValue {
if (asUint8Array) {
return JSValue::encode(
JSC::JSUint8Array::create(
Expand All @@ -95,6 +95,10 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr
}

RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast<size_t>(0), 1))));
};

if (arrayLength < 1) {
return returnEmptyArrayBufferView();
}

size_t byteLength = 0;
Expand Down Expand Up @@ -149,7 +153,7 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr
byteLength = std::min(byteLength, maxLength);

if (byteLength == 0) {
RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast<size_t>(0), 1))));
return returnEmptyArrayBufferView();
}

auto buffer = JSC::ArrayBuffer::tryCreateUninitialized(byteLength, 1);
Expand Down Expand Up @@ -237,6 +241,7 @@ JSC_DEFINE_HOST_FUNCTION(functionConcatTypedArrays, (JSGlobalObject * globalObje
auto arg2 = callFrame->argument(2);
if (!arg2.isUndefined()) {
asUint8Array = arg2.toBoolean(globalObject);
RETURN_IF_EXCEPTION(throwScope, {});
}

return flattenArrayOfBuffersIntoArrayBufferOrUint8Array(globalObject, arrayValue, maxLength, asUint8Array);
Expand Down
47 changes: 46 additions & 1 deletion src/bun.js/bindings/ZigGlobalObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1991,8 +1991,53 @@ JSC_DEFINE_HOST_FUNCTION(isAbortSignal, (JSGlobalObject*, CallFrame* callFrame))
ASSERT(callFrame->argumentCount() == 1);
return JSValue::encode(jsBoolean(callFrame->uncheckedArgument(0).inherits<JSAbortSignal>()));
}
static inline std::optional<JSC::JSValue> invokeReadableStreamFunction(JSC::JSGlobalObject& lexicalGlobalObject, const JSC::Identifier& identifier, JSC::JSValue thisValue, const JSC::MarkedArgumentBuffer& arguments)
{
JSC::VM& vm = lexicalGlobalObject.vm();
JSC::JSLockHolder lock(vm);

auto function = lexicalGlobalObject.get(&lexicalGlobalObject, identifier);
ASSERT(function.isCallable());

auto scope = DECLARE_CATCH_SCOPE(vm);
auto callData = JSC::getCallData(function);
auto result = call(&lexicalGlobalObject, function, callData, thisValue, arguments);
#if BUN_DEBUG
if (scope.exception()) {
Bun__reportError(&lexicalGlobalObject, JSValue::encode(scope.exception()));
}
#endif
EXCEPTION_ASSERT(!scope.exception() || vm.hasPendingTerminationException());
if (scope.exception())
return {};
return result;
}
extern "C" bool ReadableStream__tee(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject, JSC__JSValue* possibleReadableStream1, JSC__JSValue* possibleReadableStream2)
{
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
if (UNLIKELY(!readableStream))
return false;

auto& lexicalGlobalObject = *globalObject;
auto* clientData = static_cast<JSVMClientData*>(lexicalGlobalObject.vm().clientData);
auto& privateName = clientData->builtinFunctions().readableStreamInternalsBuiltins().readableStreamTeePrivateName();

MarkedArgumentBuffer arguments;
arguments.append(readableStream);
arguments.append(JSC::jsBoolean(true));
ASSERT(!arguments.hasOverflowed());
auto returnedValue = invokeReadableStreamFunction(lexicalGlobalObject, privateName, JSC::jsUndefined(), arguments);
if (!returnedValue)
return false;

auto results = Detail::SequenceConverter<IDLAny>::convert(lexicalGlobalObject, *returnedValue);

ASSERT(results.size() == 2);
*possibleReadableStream1 = JSValue::encode(results[0]);
*possibleReadableStream2 = JSValue::encode(results[1]);
return true;
}

extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject);
extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
{
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
Expand Down
4 changes: 4 additions & 0 deletions src/bun.js/bindings/webcore/StructuredClone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ JSC_DEFINE_HOST_FUNCTION(structuredCloneForStream, (JSGlobalObject * globalObjec

JSValue value = callFrame->uncheckedArgument(0);

if (value.isPrimitive()) {
return JSValue::encode(value);
}

if (value.inherits<JSArrayBuffer>())
RELEASE_AND_RETURN(scope, cloneArrayBufferImpl(globalObject, callFrame, CloneMode::Full));

Expand Down
76 changes: 75 additions & 1 deletion src/bun.js/webcore/body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ pub const Body = struct {
if (locked.readable.get()) |readable| {
return readable.value;
}
if (locked.promise != null) {
if (locked.promise != null or locked.action != .none) {
return JSC.WebCore.ReadableStream.used(globalThis);
}
var drain_result: JSC.WebCore.DrainResult = .{
Expand Down Expand Up @@ -982,7 +982,81 @@ pub const Body = struct {
this.Error.deinit();
}
}

pub fn tee(this: *Value, globalThis: *JSC.JSGlobalObject) Value {
var locked = &this.Locked;

if (locked.readable.isDisturbed(globalThis)) {
return Value{ .Used = {} };
}

if (locked.readable.tee(globalThis)) |readable| {
return Value{
.Locked = .{
.readable = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis),
.global = globalThis,
},
};
}
if (locked.promise != null or locked.action != .none or locked.readable.has()) {
return Value{ .Used = {} };
}

var drain_result: JSC.WebCore.DrainResult = .{
.estimated_size = 0,
};

if (locked.onStartStreaming) |drain| {
locked.onStartStreaming = null;
drain_result = drain(locked.task.?);
}

if (drain_result == .empty or drain_result == .aborted) {
this.* = .{ .Null = {} };
return Value{ .Null = {} };
}

var reader = JSC.WebCore.ByteStream.Source.new(.{
.context = undefined,
.globalThis = globalThis,
});

reader.context.setup();

if (drain_result == .estimated_size) {
reader.context.highWaterMark = @as(Blob.SizeType, @truncate(drain_result.estimated_size));
reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.estimated_size));
} else if (drain_result == .owned) {
reader.context.buffer = drain_result.owned.list;
reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint));
}

locked.readable = JSC.WebCore.ReadableStream.Strong.init(.{
.ptr = .{ .Bytes = &reader.context },
.value = reader.toReadableStream(globalThis),
}, globalThis);

if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| {
onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get().?);
}

const teed = locked.readable.tee(globalThis) orelse return Value{ .Used = {} };

return Value{
.Locked = .{
.readable = JSC.WebCore.ReadableStream.Strong.init(teed, globalThis),
.global = globalThis,
},
};
}

pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value {
this.toBlobIfPossible();

if (this.* == .Locked) {
return this.tee(globalThis);
}

if (this.* == .InternalBlob) {
var internal_blob = this.InternalBlob;
this.* = .{
Expand Down
22 changes: 20 additions & 2 deletions src/bun.js/webcore/request.zig
Original file line number Diff line number Diff line change
Expand Up @@ -744,16 +744,34 @@ pub const Request = struct {
pub fn doClone(
this: *Request,
globalThis: *JSC.JSGlobalObject,
_: *JSC.CallFrame,
callframe: *JSC.CallFrame,
) JSC.JSValue {
const this_value = callframe.this();
var cloned = this.clone(getAllocator(globalThis), globalThis);

if (globalThis.hasException()) {
cloned.finalize();
return .zero;
}

return cloned.toJS(globalThis);
const js_wrapper = cloned.toJS(globalThis);
if (js_wrapper != .zero) {
if (cloned.body.value == .Locked) {
if (cloned.body.value.Locked.readable.get()) |readable| {
// If we are teed, then we need to update the cached .body
// value to point to the new readable stream
// We must do this on both the original and cloned request
// but especially the original request since it will have a stale .body value now.
Request.bodySetCached(js_wrapper, globalThis, readable.value);

if (this.body.value.Locked.readable.get()) |other_readable| {
Request.bodySetCached(this_value, globalThis, other_readable.value);
}
}
}
}

return js_wrapper;
}

// Returns if the request has headers already cached/set.
Expand Down
27 changes: 25 additions & 2 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,33 @@ pub const Response = struct {
pub fn doClone(
this: *Response,
globalThis: *JSC.JSGlobalObject,
_: *JSC.CallFrame,
callframe: *JSC.CallFrame,
) JSValue {
const this_value = callframe.this();
const cloned = this.clone(globalThis);
return Response.makeMaybePooled(globalThis, cloned);
if (globalThis.hasException()) {
cloned.finalize();
return .zero;
}

const js_wrapper = Response.makeMaybePooled(globalThis, cloned);

if (js_wrapper != .zero) {
if (cloned.body.value == .Locked) {
if (cloned.body.value.Locked.readable.get()) |readable| {
// If we are teed, then we need to update the cached .body
// value to point to the new readable stream
// We must do this on both the original and cloned response
// but especially the original response since it will have a stale .body value now.
Response.bodySetCached(js_wrapper, globalThis, readable.value);
if (this.body.value.Locked.readable.get()) |other_readable| {
Response.bodySetCached(this_value, globalThis, other_readable.value);
}
}
}
}

return js_wrapper;
}

pub fn makeMaybePooled(globalObject: *JSC.JSGlobalObject, ptr: *Response) JSValue {
Expand Down
25 changes: 25 additions & 0 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub const ReadableStream = struct {
return this.held.globalThis;
}

pub fn has(this: *Strong) bool {
return this.held.has();
}

pub fn isDisturbed(this: *const Strong, global: *JSC.JSGlobalObject) bool {
if (this.get()) |stream| {
return stream.isDisturbed(global);
Expand Down Expand Up @@ -84,8 +88,29 @@ pub const ReadableStream = struct {
// }
this.held.deinit();
}

pub fn tee(this: *Strong, global: *JSGlobalObject) ?ReadableStream {
if (this.get()) |stream| {
const first, const second = stream.tee(global) orelse return null;
this.held.set(global, first.value);
return second;
}
return null;
}
};

extern fn ReadableStream__tee(stream: JSValue, globalThis: *JSGlobalObject, out1: *JSC.JSValue, out2: *JSC.JSValue) bool;
pub fn tee(this: *const ReadableStream, globalThis: *JSGlobalObject) ?struct { ReadableStream, ReadableStream } {
var out1: JSC.JSValue = .zero;
var out2: JSC.JSValue = .zero;
if (!ReadableStream__tee(this.value, globalThis, &out1, &out2)) {
return null;
}
const out_stream2 = ReadableStream.fromJS(out2, globalThis) orelse return null;
const out_stream1 = ReadableStream.fromJS(out1, globalThis) orelse return null;
return .{ out_stream1, out_stream2 };
}

pub fn toJS(this: *const ReadableStream) JSValue {
return this.value;
}
Expand Down
36 changes: 32 additions & 4 deletions src/codegen/bundle-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface ParsedBuiltin {
directives: Record<string, any>;
source: string;
async: boolean;
enums: string[];
}

interface BundledBuiltin {
Expand Down Expand Up @@ -74,13 +75,15 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB
// and then compile those separately

const consumeWhitespace = /^\s*/;
const consumeTopLevelContent = /^(\/\*|\/\/|type|import|interface|\$|export (?:async )?function|(?:async )?function)/;
const consumeEndOfType = /;|.(?=export|type|interface|\$|\/\/|\/\*|function)/;
const consumeTopLevelContent =
/^(\/\*|\/\/|type|import|interface|\$|const enum|export (?:async )?function|(?:async )?function)/;
const consumeEndOfType = /;|.(?=export|type|interface|\$|\/\/|\/\*|function|const enum)/;

const functions: ParsedBuiltin[] = [];
let directives: Record<string, any> = {};
const bundledFunctions: BundledBuiltin[] = [];
let internal = false;
const topLevelEnums: { name: string; code: string }[] = [];

while (contents.length) {
contents = contents.replace(consumeWhitespace, "");
Expand All @@ -107,6 +110,16 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB
contents = contents.slice(i + 1);
} else if (match[1] === "interface") {
contents = sliceSourceCode(contents, false).rest;
} else if (match[1] === "const enum") {
const { result, rest } = sliceSourceCode(contents, false);
const i = result.indexOf("{\n");
// Support const enums in module scope.
topLevelEnums.push({
name: result.slice("const enum ".length, i).trim(),
code: "\n" + result,
});

contents = rest;
} else if (match[1] === "$") {
const directive = contents.match(/^\$([a-zA-Z0-9]+)(?:\s*=\s*([^\r\n]+?))?\s*;?\r?\n/);
if (!directive) {
Expand Down Expand Up @@ -148,12 +161,27 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB
globalThis.requireTransformer(x, SRC_DIR + "/" + basename),
);

const source = result.trim().slice(2, -1);
const constEnumsUsedInFunction: string[] = [];
if (topLevelEnums.length) {
// If the function references a top-level const enum let's add the code
// to the top-level scope of the function so that the transpiler will
// inline all the values and strip out the enum object.
for (const { name, code } of topLevelEnums) {
// Only include const enums which are referenced in the function source.
if (source.includes(name)) {
constEnumsUsedInFunction.push(code);
}
}
}

functions.push({
name,
params,
directives,
source: result.trim().slice(2, -1),
source,
async,
enums: constEnumsUsedInFunction,
});
contents = rest;
directives = {};
Expand All @@ -178,7 +206,7 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB
`// @ts-nocheck
// GENERATED TEMP FILE - DO NOT EDIT
// Sourced from ${path.relative(TMP_DIR, filename)}
${fn.enums.join("\n")}
// do not allow the bundler to rename a symbol to $
($);
Expand Down
Loading

0 comments on commit d38fc90

Please sign in to comment.